Shading - jdbc 源碼分析(七) - sql 歸併

主要類:

  • ResultSetFactory:分片結果集歸併工廠類,獲取組裝後的結果集(能夠理解爲原始的resultSet通過處理,生成的新的resultSet)
  • AbstractDelegateResultSet :代理結果集抽象類
  • IteratorReducerResultSet :迭代歸併的彙集結果集,對於多個resultset的結果進行迭代, 繼承AbstractDelegateResultSet
  • WrapperResultSet:ShardingResultSets 的內部類,對原生resultSet包了下,重寫了了firstNext()、afterFirstNext()方法
  • LimitCouplingResultSet: 分頁限制條件的鏈接結果集,用於須要對結果集作分頁處理的狀況,繼承AbstractDelegateResultSet
  • StreamingOrderByReducerResultSet:流式排序的彙集結果集,用於對結果集排序的處理,繼承AbstractDelegateResultSet

執行過程:

sql:sql

SELECT o.order_id FROM t_order o WHERE o.order_id in (1000,1200) order by user_id desc limit 10編程

  1. executeQuery:

調用ResultSetFactory,獲取組裝後的ResultSet,generateExecutor(sql).executeQuery() 屬於SQL執行部分,以前分析過,這裏就再也不說了數組

public ResultSet executeQuery(final String sql) throws SQLException {
        ResultSet result;
        try {
            result = ResultSetFactory.getResultSet(generateExecutor(sql).executeQuery(), routeResult.getSqlStatement());
        } finally {
            setCurrentResultSet(null);
        }
        setCurrentResultSet(result);
        return result;
    }
複製代碼
  1. getResultSet():
/**
     * 獲取結果集.
     *
     * @param resultSets 結果集列表
     * @param sqlStatement SQL語句對象
     * @return 結果集包裝
     * @throws SQLException SQL異常
     */
    public static ResultSet getResultSet(final List<ResultSet> resultSets, final SQLStatement sqlStatement) throws SQLException {
       //實例化ShardingResultSets
        ShardingResultSets shardingResultSets = new ShardingResultSets(resultSets);
        log.debug("Sharding-JDBC: Sharding result sets type is '{}'", shardingResultSets.getType().toString());
        //組裝結果集
        switch (shardingResultSets.getType()) {
            case EMPTY:
                return buildEmpty(resultSets);
            case SINGLE:
                return buildSingle(shardingResultSets);
            case MULTIPLE:
                return buildMultiple(shardingResultSets, sqlStatement);
            default:
                throw new UnsupportedOperationException(shardingResultSets.getType().toString());
        }
    }
複製代碼

2.1:實例化ShardingResultSetsbash

public ShardingResultSets(final List<ResultSet> resultSets) throws SQLException {
        this.resultSets = filterResultSets(resultSets);
        type = generateType();
    }
複製代碼

對於分片執行後獲得的ResultSet集合,過濾掉空的結果,對於非空,使用 WrapperResultSet 包裝起來app

問題:WrapperResultSet是個內部類,爲何還要專門新建一個內部類來處理下,直接用原生的不就好了麼?ide

答:WrapperResultSet 繼承了AbstractDelegateResultSet,這個類是被裝飾類(在調用ResultSet的next()方法獲取數據的時候,使用到了裝飾模式),同時這個類還重寫了firstNext() 和afterFirstNext()方法,獲取數據的時候會用到函數

private List<ResultSet> filterResultSets(final List<ResultSet> resultSets) throws SQLException {
        List<ResultSet> result = new ArrayList<>(resultSets.size());
        for (ResultSet each : resultSets) {
            if (each.next()) {
                result.add(new WrapperResultSet(each));
            }
        }
        return result;
    }
複製代碼

根據resultSets 集合的大小來判斷是單結果集仍是多結果集,多結果集的處理比較複雜(用到了裝飾模式),這裏指對於排序、分頁的處理ui

private Type generateType() {
        if (resultSets.isEmpty()) {
            return Type.EMPTY;
        } else if (1 == resultSets.size()) {
            return Type.SINGLE;
        } else {
            return Type.MULTIPLE;
        }
    }
複製代碼

2.2:根據ShardingResultSets的type屬性構建ResultSet的子類this

既然多結果集的狀況比較複雜,咱們就以複雜的例子來分析,上面的SQL也是分頁,排序都用上了。spa

private static ResultSet buildMultiple(final ShardingResultSets shardingResultSets, final SQLStatement sqlStatement) throws SQLException {
        ResultSetMergeContext resultSetMergeContext = new ResultSetMergeContext(shardingResultSets, sqlStatement);
        return buildCoupling(buildReducer(resultSetMergeContext), resultSetMergeContext);
    }
複製代碼

在分析多結果集以前,咱們先來了解下裝飾模式,多結果集就是使用這個模式來對結果集進行排序、分頁的。(關於裝飾對象,我以爲這篇文章寫得不錯)

裝飾模式的應用

上面這幅圖是結果集類間的依賴關係。

  • ResultSet:抽象的構建角色,也能夠理解爲被裝飾的原始對象
  • AbstractDelegateResultSet:Decorator,裝飾角色,內部維護一個抽象構建的引用,接受全部裝飾對象的請求,並轉發給真實的對象處理,這樣就能夠在調用真實對象的方法前,增長一些新的功能
  • IteratorReducerResultSet:具體的裝飾者,對於多個結果集,負責當一個結果集的數據處理完成後,切換到另一個結果集上面(多個結果集遍歷)
  • StreamingOrderByReducerResultSet:具體的裝飾者,內部維護一個PriorityQueue,負責對排序好的結果集消費
  • LimitCouplingResultSet: 具體的裝飾者,看名字就知道了吧,處理多結果集的分頁
  • WrapperResultSet:具體的裝飾者,主要負責移動到下一個數據

下面接着分析代碼: 咱們的SQL中帶有order by,因此返回StreamingOrderByReducerResultSet

  • buildReducer:
private static ResultSet buildReducer(final ResultSetMergeContext resultSetMergeContext) throws SQLException {
        //判斷分組歸併是否須要內存排序.
        if (resultSetMergeContext.isNeedMemorySortForGroupBy()) {
            resultSetMergeContext.setGroupByKeysToCurrentOrderByKeys();
            return new MemoryOrderByReducerResultSet(resultSetMergeContext);
        }
        //判斷分組是否須要排序(帶有order by)
        if (!resultSetMergeContext.getSqlStatement().getGroupByList().isEmpty() || !resultSetMergeContext.getSqlStatement().getOrderByList().isEmpty()) {
            return new StreamingOrderByReducerResultSet(resultSetMergeContext);
        }
        return new IteratorReducerResultSet(resultSetMergeContext);
    }
    
複製代碼

StreamingOrderByReducerResultSet的構造函數:

public StreamingOrderByReducerResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException {
        //把resultSet傳遞到父類
        super(resultSetMergeContext.getShardingResultSets().getResultSets());
        //實例化PriorityQueue處理排序
        delegateResultSetQueue = new PriorityQueue<>(getResultSets().size());
        orderByKeys = resultSetMergeContext.getCurrentOrderByKeys();
    }
複製代碼

問題:爲何要用PriorityQueue 優先級隊列處理排序,而不用普通的list sort一下

回答:我認爲主要有2個方面:一、隊列內部用鏈表維護的,在作排序的時候直接更改節點指針就能夠,時間複雜度爲O(1),數組的話要作移位操做,時間複雜度O(n),因此鏈表看起來更合適。二、假設執行後有2個結果集A、B;如今對A、B結果集的數據進行排序(每一個結果集自己已是排序好的),用隊列的話,每次分別取2個結果集中的第一個數據放入隊列,每次只對其中2個數據排序,用完後便從隊列中移除(poll),這樣比較方便,而且每次排序也只是2個值比較,對於單個next取值的狀況 節省內存(數據量大的話,排序很佔用內存的把)

  • buildCoupling: SQL中帶有limit,而且只有一個order by 字段,因此返回LimitCouplingResultSet
private static ResultSet buildCoupling(final ResultSet resultSet, final ResultSetMergeContext resultSetMergeContext) throws SQLException {
        ResultSet result = resultSet;
        //group by處理
        if (!resultSetMergeContext.getSqlStatement().getGroupByList().isEmpty() || !resultSetMergeContext.getSqlStatement().getAggregationSelectItems().isEmpty()) {
            result = new GroupByCouplingResultSet(result, resultSetMergeContext);
        }
        //判斷是否須要內存排序:什麼狀況下須要?在多個order by 字段的時候
        if (resultSetMergeContext.isNeedMemorySortForOrderBy()) {
            resultSetMergeContext.setOrderByKeysToCurrentOrderByKeys();
            result = new MemoryOrderByCouplingResultSet(result, resultSetMergeContext);
        }
        //分頁處理
        if (null != resultSetMergeContext.getSqlStatement().getLimit()) {
            result = new LimitCouplingResultSet(result, resultSetMergeContext.getSqlStatement());
        }
        return result;
    }
複製代碼

至此,裝飾模式須要的類已經構建好了,分別是:LimitCouplingResultSet處理分頁、StreamingOrderByReducerResultSet處理排序、WrapperResultSet

resultSet.next():

AbstractDelegateResultSet 重寫了resultSet.next()方法,下面是重寫的邏輯:

@Override
    public final boolean next() throws SQLException {
    //beforeFirst 默認true,走firstNext
        boolean result = beforeFirst ? firstNext() : afterFirstNext();
        beforeFirst = false;
        if (result) {
            LoggerFactory.getLogger(this.getClass().getName()).debug(
                    "Access result set, total size is: {}, result set hashcode is: {}, offset is: {}", getResultSets().size(), delegate.hashCode(), ++offset);
        }
        return result;
    }
複製代碼

LimitCouplingResultSet#firstNext():

對於A、B 2個結果集,好比要查 10,15索引位的數據,那麼咱們會把0,15索引位的結果查詢出來,而後再過濾掉結果集A 10索引位前的數據,剩下5個數據再從A、B結果集取

@Override
    protected boolean firstNext() throws SQLException {
        return skipOffset() && doNext();
    }
    //過濾offset索引位前的數據
    private boolean skipOffset() throws SQLException {
        for (int i = 0; i < limit.getOffset(); i++) {
          // 若是沒有數據了,就返回false,說明A結果集沒有數據了,交給下一個裝飾類,切換到B結果集
            if (!getDelegate().next()) {
                return false;
            }
        }
        return true;
    }
    //當rowNumber>rowCOunt,說明已經取夠了5條數據,此時能夠返回了
    private boolean doNext() throws SQLException {
        return ++rowNumber <= limit.getRowCount() && getDelegate().next();
    }
複製代碼

分頁處理完,getDelegate().next() 調用StreamingOrderByReducerResultSet#next,StreamingOrderByReducerResultSet繼承了AbstractDelegateResultSet,因此也是走的上面重寫的next()邏輯。


  • StreamingOrderByReducerResultSet#firstNext()

遍歷A、B 2個結果集,分別取出結果集中的第一個元素,放入隊列中,peek出第一個元素(此時的元素已經按照排序規則排好),setDelegate()切換包裝(排序後)的結果集,這樣下一個裝飾類獲取到的就是排序後的結果集

protected boolean firstNext() throws SQLException {
        for (ResultSet each : getResultSets()) {
            ResultSetOrderByWrapper wrapper = new ResultSetOrderByWrapper(each);
            //wrapper#next()取出第一個元素
            if (wrapper.next()) {
                delegateResultSetQueue.offer(wrapper);
            }
        }
        return doNext();
    }
    
    private boolean doNext() {
        if (delegateResultSetQueue.isEmpty()) {
            return false;
        }
        setDelegate(delegateResultSetQueue.peek().delegate);
        log.trace("Chosen order by value: {}, current result set hashcode: {}", delegateResultSetQueue.peek().row, getDelegate().hashCode());
        return true;
    }
    
    @RequiredArgsConstructor
    private class ResultSetOrderByWrapper implements Comparable<ResultSetOrderByWrapper> {
        
        private final ResultSet delegate;
        //具備排序功能的數據行對象
        private OrderByResultSetRow row;
        
        boolean next() throws SQLException {
        // 調用next()
            boolean result = delegate.next();
            //有值
            if (result) {
               //實例化 帶有排序值的行對象
                row = new OrderByResultSetRow(delegate, orderByKeys);
            }
            return result;
        }
        //比較
        @Override
        public int compareTo(final ResultSetOrderByWrapper o) {
            return row.compareTo(o.row);
        }
    }
複製代碼

問:怎麼排序的?

答:ResultSetOrderByWrapper 實現了Comparable接口,咱們調用next方法,實例化了OrderByResultSetRow 這一行對象,行對象把排序的字段值取到,也重寫了Comparable接口,當咱們把ResultSetOrderByWrapper對象塞到隊列裏,隊列會調用對象的compareTo方法,對隊列的數據進行從新排序,這樣取出來的第一個元素就是排好序後的元素。

排序相關代碼:

public final class OrderByResultSetRow extends AbstractResultSetRow implements Comparable<OrderByResultSetRow> {
    
    private final List<OrderBy> orderBies;
    
    private final List<Comparable<?>> orderByValues;
    
    public OrderByResultSetRow(final ResultSet resultSet, final List<OrderBy> orderBies) throws SQLException {
        super(resultSet);
        this.orderBies = orderBies;
        orderByValues = loadOrderByValues();
    }
    //加載排序字段的值
    private List<Comparable<?>> loadOrderByValues() {
        List<Comparable<?>> result = new ArrayList<>(orderBies.size());
        for (OrderBy each : orderBies) {
            Object value = getCell(each.getColumnIndex());
            Preconditions.checkState(value instanceof Comparable, "Sharding-JDBC: order by value must extends Comparable");
            result.add((Comparable<?>) value);
        }
        return result;
    }
    //從新排序規則
    @Override
    public int compareTo(final OrderByResultSetRow otherOrderByValue) {
        for (int i = 0; i < orderBies.size(); i++) {
            OrderBy thisOrderBy = orderBies.get(i);
            int result = ResultSetUtil.compareTo(orderByValues.get(i), otherOrderByValue.orderByValues.get(i), thisOrderBy.getOrderByType());
            if (0 != result) {
                return result;
            }
        }
        return 0;
    }
}
複製代碼

排好序後,AbstractDelegateResultSet 的ResultSet delegate屬性就是正確的結果集,調用getString()之類的方法獲取SQL結果。

@Override
    public final String getString(final String columnLabel) throws SQLException {
        return delegate.getString(columnLabel);
    }
複製代碼

最後:

小尾巴走一波,歡迎關注個人公衆號,不按期分享編程、投資、生活方面的感悟:)

相關文章
相關標籤/搜索