sharding-jdbc源碼解析之結果集歸併

結果集歸併源碼解析sql

本文轉自「天河聊技術」微信公衆號微信

 

找到這個方法,執行查詢的方法多線程

com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.ShardingPreparedStatement#executeQueryide

@Override
    public ResultSet executeQuery() throws SQLException {
        ResultSet result;
        try {
//            路由到預編譯對象執行單元集合
            Collection<PreparedStatementUnit> preparedStatementUnits = route();
//            多線程執行sql查詢返回結果集對象集合
            List<ResultSet> resultSets = new PreparedStatementExecutor(
                    getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();
//走結果計歸併引擎執行結果集歸併邏輯-》
            result = new ShardingResultSet(resultSets, new MergeEngine(
                    getShardingConnection().getShardingContext().getDatabaseType(), resultSets, (SelectStatement) getRouteResult().getSqlStatement()).merge());
        } finally {
            clearBatch();
        }
        setCurrentResultSet(result);
        return result;
    }

建立結果集歸併引擎對象,進入到構造器ui

/**
 * 分片結果集歸併引擎.
 *
 * @author zhangliang
 */
public final class MergeEngine {
    
    private final DatabaseType databaseType;
    
    private final List<ResultSet> resultSets;
    
    private final SelectStatement selectStatement;
    
    private final Map<String, Integer> columnLabelIndexMap;
    
    public MergeEngine(final DatabaseType databaseType, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
        this.databaseType = databaseType;
        this.resultSets = resultSets;
        this.selectStatement = selectStatement;
        columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0));
    }

獲取結果集的源數據this

columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0));
private Map<String, Integer> getColumnLabelIndexMap(final ResultSet resultSet) throws SQLException {
//        獲取resultSet的源數據
        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
        Map<String, Integer> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
            result.put(SQLUtil.getExactlyValue(resultSetMetaData.getColumnLabel(i)), i);
        }
        return result;
    }

進入到這個方法線程

com.dangdang.ddframe.rdb.sharding.merger.MergeEngine#merge對象

/**
 * 合併結果集.
 *
 * @return 歸併完畢後的結果集
 * @throws SQLException SQL異常
 */
public ResultSetMerger merge() throws SQLException {//結果集合並業務方法
    selectStatement.setIndexForItems(columnLabelIndexMap);
    return decorate(build());
}

進入到這個方法排序

com.dangdang.ddframe.rdb.sharding.merger.MergeEngine#build隊列

private ResultSetMerger build() throws SQLException {
//        排序項不爲空或者聚合選擇項不爲空
        if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {
//            若是分組項和排序項一致,走流式結果集歸併-》
            if (selectStatement.isSameGroupByAndOrderByItems()) {
                return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());
            } else {
//                不然走內存結果集歸併,要儘可能避免這種狀況,會佔用大量內存=》
                return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());
            }
        }
        if (!selectStatement.getOrderByItems().isEmpty()) {
            return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType());
        }
        return new IteratorStreamResultSetMerger(resultSets);
    }
return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());

 

建立分組流式結果集歸併,進入到構造器方法

public GroupByStreamResultSetMerger(
        final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException {
    super(resultSets, selectStatement.getOrderByItems(), nullOrderType);
    this.labelAndIndexMap = labelAndIndexMap;
    this.selectStatement = selectStatement;
    currentRow = new ArrayList<>(labelAndIndexMap.size());
    currentGroupByValues = getOrderByValuesQueue().isEmpty() ? Collections.emptyList() : new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
}

這一行代碼

super(resultSets, selectStatement.getOrderByItems(), nullOrderType);

建立排序流式結果集歸併對象

public OrderByStreamResultSetMerger(final List<ResultSet> resultSets, final List<OrderItem> orderByItems, final OrderType nullOrderType) throws SQLException {
        this.orderByItems = orderByItems;
//優先級隊列實現
        this.orderByValuesQueue = new PriorityQueue<>(resultSets.size());
        this.nullOrderType = nullOrderType;
//        把要排序的結果集往隊列裏放-》
        orderResultSetsToQueue(resultSets);
        isFirstNext = true;
    }
//        把要排序的結果集往隊列裏放-》
        orderResultSetsToQueue(resultSets);
private void orderResultSetsToQueue(final List<ResultSet> resultSets) throws SQLException {
        for (ResultSet each : resultSets) {
            OrderByValue orderByValue = new OrderByValue(each, orderByItems, nullOrderType);
            if (orderByValue.next()) {
                orderByValuesQueue.offer(orderByValue);
            }
        }
//        流式結果集歸併,設置當前的流式歸併結果集,你們看這裏存儲是當前的結果集因此不會出現內存溢出問題
        setCurrentResultSet(orderByValuesQueue.isEmpty() ? resultSets.get(0) : orderByValuesQueue.peek().getResultSet());
    }

返回到這個方法

private ResultSetMerger build() throws SQLException {
//        排序項不爲空或者聚合選擇項不爲空
        if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {
//            若是分組項和排序項一致,走流式結果集歸併-》
            if (selectStatement.isSameGroupByAndOrderByItems()) {
                return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());
            } else {
//                不然走內存結果集歸併,要儘可能避免這種狀況,會佔用大量內存=》
                return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());
            }
        }
        if (!selectStatement.getOrderByItems().isEmpty()) {
            return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType());
        }
        return new IteratorStreamResultSetMerger(resultSets);
    }

這一行

//                不然走內存結果集歸併,要儘可能避免這種狀況,會佔用大量內存=》
                return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());

建立內存分組結果集歸併對象

public GroupByMemoryResultSetMerger(
            final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException {
        super(labelAndIndexMap);
        this.selectStatement = selectStatement;
        this.nullOrderType = nullOrderType;
//        建立內存結果集行對象
        memoryResultSetRows = init(resultSets);
    }

返回到這個方法

private ResultSetMerger build() throws SQLException {
//        排序項不爲空或者聚合選擇項不爲空
        if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {
//            若是分組項和排序項一致,走流式結果集歸併-》
            if (selectStatement.isSameGroupByAndOrderByItems()) {
                return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());
            } else {
//                不然走內存結果集歸併,要儘可能避免這種狀況,會佔用大量內存=》
                return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());
            }
        }
        if (!selectStatement.getOrderByItems().isEmpty()) {
            return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType());
        }
//        建立迭代歸併結果集歸併對象
        return new IteratorStreamResultSetMerger(resultSets);
    }

這一行

//        建立迭代歸併結果集歸併對象
        return new IteratorStreamResultSetMerger(resultSets);
public IteratorStreamResultSetMerger(final List<ResultSet> resultSets) {
    this.resultSets = resultSets.iterator();
    setCurrentResultSet(this.resultSets.next());
}

返回到這個方法

/**
 * 合併結果集.
 *
 * @return 歸併完畢後的結果集
 * @throws SQLException SQL異常
 */
public ResultSetMerger merge() throws SQLException {//結果集合並業務方法
    selectStatement.setIndexForItems(columnLabelIndexMap);
    return decorate(build());
}

進入這個方法com.dangdang.ddframe.rdb.sharding.merger.MergeEngine#decorate

private ResultSetMerger decorate(final ResultSetMerger resultSetMerger) throws SQLException {
        ResultSetMerger result = resultSetMerger;
        if (null != selectStatement.getLimit()) {
//            裝飾器模式對分頁結果集歸併進行了進一步的封裝
            result = new LimitDecoratorResultSetMerger(result, selectStatement.getLimit());
        }
        return result;
    }

 

結果集歸併源碼解析到這裏就結束了。

 

 

說到最後

源碼解析的部份內容比較多,仍是直接看源碼比較直觀,我這裏的源碼解析只是給予你們一個閱讀源碼的思路,我已經儘可能把sharding-jdbc實現的關鍵點都介紹了下,可能還有其餘的好的實現沒有介紹出來,sharding-jdbc源碼解析系列到這裏就所有結束了,若是須要進一步深度溝通,請加我微信,我會把你拉到天河聊技術技術討論羣裏相互交流,以上源碼解析僅供參考。

相關文章
相關標籤/搜索