數據庫中間件 Sharding-JDBC 源碼分析 —— 結果歸併

摘要: 原創出處 http://www.iocoder.cn/Sharding-JDBC/result-merger/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!java

本文主要基於 Sharding-JDBC 1.5.0 正式版git


🙂🙂🙂關注**微信公衆號:【芋道源碼】**有福利:github

  1. RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
  3. 您對於源碼的疑問每條留言將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢
  4. 新的源碼解析文章實時收到通知。每週更新一篇左右
  5. 認真的源碼交流微信羣。

1. 概述

本文分享查詢結果歸併的源碼實現。算法

正如前文《SQL 執行》提到的**「分表分庫,須要執行的 SQL 數量從單條變成了多條」,多個SQL執行**結果必然須要進行合併,例如:sql

SELECT * FROM t_order ORDER BY create_time

在各分片排序完後,Sharding-JDBC 獲取到結果後,仍然須要再進一步排序。目前有 分頁分組排序聚合列迭代 五種場景須要作進一步處理。固然,若是單分片SQL執行結果是無需合併的。在《SQL 執行》不知不覺已經分享了插入、更新、刪除操做的結果合併,因此下面咱們一塊兒看看查詢結果歸併的實現。數據庫


Sharding-JDBC 正在收集使用公司名單:傳送門
🙂 你的登記,會讓更多人蔘與和使用 Sharding-JDBC。傳送門
Sharding-JDBC 也會所以,可以覆蓋更多的業務場景。傳送門
登記吧,騷年!傳送門
數組

2. MergeEngine

MergeEngine,分片結果集歸併引擎。緩存

// MergeEngine.java
/**
* 數據庫類型
*/
private final DatabaseType databaseType;
/**
* 結果集集合
*/
private final List<ResultSet> resultSets;
/**
* Select SQL語句對象
*/
private final SelectStatement selectStatement;
/**
* 查詢列名與位置映射
*/
private final Map<String, Integer> columnLabelIndexMap;
    
public MergeEngine(final DatabaseType databaseType, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
   this.databaseType = databaseType;
   this.resultSets = resultSets;
   this.selectStatement = selectStatement;
   // 得到 查詢列名與位置映射
   columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0));
}

/**
* 得到 查詢列名與位置映射
*
* @param resultSet 結果集
* @return 查詢列名與位置映射
* @throws SQLException 當結果集已經關閉
*/
private Map<String, Integer> getColumnLabelIndexMap(final ResultSet resultSet) throws SQLException {
   ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); // 元數據(包含查詢列信息)
   Map<String, Integer> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
   for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
       result.put(SQLUtil.getExactlyValue(resultSetMetaData.getColumnLabel(i)), i);
   }
   return result;
}
  • 當 MergeEngine 被建立時,會傳入 resultSets 結果集集合,並根據其得到 columnLabelIndexMap 查詢列名與位置映射。經過 columnLabelIndexMap,能夠很方便的使用查詢列名得到在返回結果記錄列( header )的第幾列。

MergeEngine 的 #merge() 方法做爲入口提供查詢結果歸併功能。微信

/**
* 合併結果集.
*
* @return 歸併完畢後的結果集
* @throws SQLException SQL異常
*/
public ResultSetMerger merge() throws SQLException {
   selectStatement.setIndexForItems(columnLabelIndexMap);
   return decorate(build());
}
  • #merge() 主體邏輯就兩行代碼,設置查詢列位置信息,並返回合適的歸併結果集接口( ResultSetMerger ) 實現。

2.1 SelectStatement#setIndexForItems()

// SelectStatement.java
/**
* 爲選擇項設置索引.
* 
* @param columnLabelIndexMap 列標籤索引字典
*/
public void setIndexForItems(final Map<String, Integer> columnLabelIndexMap) {
   setIndexForAggregationItem(columnLabelIndexMap);
   setIndexForOrderItem(columnLabelIndexMap, orderByItems);
   setIndexForOrderItem(columnLabelIndexMap, groupByItems);
}
  • 部分查詢列是通過推到出來,在 SQL解析 過程當中,未得到到查詢列位置,須要經過該方法進行初始化。對這塊不瞭解的同窗,回頭能夠看下《SQL 解析(三)之查詢SQL》。🙂 如今不用回頭,皇冠會掉。數據結構

  • #setIndexForAggregationItem() 處理 AVG聚合計算列 推導出其對應的 SUM/COUNT 聚合計算列的位置:

    private void setIndexForAggregationItem(final Map<String, Integer> columnLabelIndexMap) {
       for (AggregationSelectItem each : getAggregationSelectItems()) {
           Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s, please add alias for aggregate selections", each));
           each.setIndex(columnLabelIndexMap.get(each.getColumnLabel()));
           for (AggregationSelectItem derived : each.getDerivedAggregationSelectItems()) {
               Preconditions.checkState(columnLabelIndexMap.containsKey(derived.getColumnLabel()), String.format("Can't find index: %s", derived));
               derived.setIndex(columnLabelIndexMap.get(derived.getColumnLabel()));
           }
       }
    }
  • #setIndexForOrderItem() 處理 ORDER BY / GROUP BY 列不在查詢列 推導出的查詢列的位置:

    private void setIndexForOrderItem(final Map<String, Integer> columnLabelIndexMap, final List<OrderItem> orderItems) {
        for (OrderItem each : orderItems) {
          if (-1 != each.getIndex()) {
              continue;
          }
          Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s", each));
          if (columnLabelIndexMap.containsKey(each.getColumnLabel())) {
              each.setIndex(columnLabelIndexMap.get(each.getColumnLabel()));
          }
        }
    }

2.2 ResultSetMerger

ResultSetMerger,歸併結果集接口。

咱們先來看看總體的類結構關係:

功能 上分紅四種:

  • 分組:GroupByMemoryResultSetMerger、GroupByStreamResultSetMerger;包含聚合列
  • 排序:OrderByStreamResultSetMerger
  • 迭代:IteratorStreamResultSetMerger
  • 分頁:LimitDecoratorResultSetMerger

實現方式 上分紅三種:

  • Stream 流式:AbstractStreamResultSetMerger
  • Memory 內存:AbstractMemoryResultSetMerger
  • Decorator 裝飾者:AbstractDecoratorResultSetMerger

何時該用什麼實現方式?

  • Stream 流式:將數據遊標與結果集的遊標保持一致,順序的從結果集中一條條的獲取正確的數據。看完下文第三節 OrderByStreamResultSetMerger 能夠形象的理解。
  • Memory 內存:須要將結果集的全部數據都遍歷並存儲在內存中,再經過內存歸併後,將內存中的數據假裝成結果集返回。看完下文第五節 GroupByMemoryResultSetMerger 能夠形象的理解。
  • Decorator 裝飾者:能夠和前兩者任意組合
// MergeEngine.java
/**
* 合併結果集.
*
* @return 歸併完畢後的結果集
* @throws SQLException SQL異常
*/
public ResultSetMerger merge() throws SQLException {
   selectStatement.setIndexForItems(columnLabelIndexMap);
   return decorate(build());
}
    
private ResultSetMerger build() throws SQLException {
   if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) { // 分組 或 聚合列
       if (selectStatement.isSameGroupByAndOrderByItems()) {
           return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());
       } else {
           return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());
       }
   }
   if (!selectStatement.getOrderByItems().isEmpty()) {
       return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType());
   }
   return new IteratorStreamResultSetMerger(resultSets);
}
    
private ResultSetMerger decorate(final ResultSetMerger resultSetMerger) throws SQLException {
   ResultSetMerger result = resultSetMerger;
   if (null != selectStatement.getLimit()) {
       result = new LimitDecoratorResultSetMerger(result, selectStatement.getLimit());
   }
   return result;
}

2.2.1 AbstractStreamResultSetMerger

AbstractStreamResultSetMerger,流式歸併結果集抽象類,提供從當前結果集得到行數據。

public abstract class AbstractStreamResultSetMerger implements ResultSetMerger {
    
    /**
     * 當前結果集
     */
    private ResultSet currentResultSet;
    
    protected ResultSet getCurrentResultSet() throws SQLException {
        if (null == currentResultSet) {
            throw new SQLException("Current ResultSet is null, ResultSet perhaps end of next.");
        }
        return currentResultSet;
    }
    
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
        if (Object.class == type) {
            return getCurrentResultSet().getObject(columnIndex);
        }
        if (int.class == type) {
            return getCurrentResultSet().getInt(columnIndex);
        }
        if (String.class == type) {
            return getCurrentResultSet().getString(columnIndex);
        }
        // .... 省略其餘數據類型讀取相似代碼
        return getCurrentResultSet().getObject(columnIndex);
    }
}

2.2.2 AbstractMemoryResultSetMerger

AbstractMemoryResultSetMerger,內存歸併結果集抽象類,提供從內存數據行對象( MemoryResultSetRow ) 得到行數據。

public abstract class AbstractMemoryResultSetMerger implements ResultSetMerger {
    
    private final Map<String, Integer> labelAndIndexMap;
    /**
     * 內存數據行對象
     */
    @Setter
    private MemoryResultSetRow currentResultSetRow;
    
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
        if (Blob.class == type || Clob.class == type || Reader.class == type || InputStream.class == type || SQLXML.class == type) {
            throw new SQLFeatureNotSupportedException();
        }
        return currentResultSetRow.getCell(columnIndex);
    }
}
  • 和 AbstractStreamResultSetMerger 對比,貌似區別不大?!確實,從抽象父類上看,兩種實現方式差很少。抽象父類提供給實現子類的是數據讀取的功能,真正的流式歸併、內存歸併是在子類實現上體現。
public class MemoryResultSetRow {

    /**
     * 行數據
     */
    private final Object[] data;
    
    public MemoryResultSetRow(final ResultSet resultSet) throws SQLException {
        data = load(resultSet);
    }

    /**
     * 加載 ResultSet 當前行數據到內存
     * @param resultSet 結果集
     * @return 行數據
     * @throws SQLException 當結果集關閉
     */
    private Object[] load(final ResultSet resultSet) throws SQLException {
        int columnCount = resultSet.getMetaData().getColumnCount();
        Object[] result = new Object[columnCount];
        for (int i = 0; i < columnCount; i++) {
            result[i] = resultSet.getObject(i + 1);
        }
        return result;
    }
    
    /**
     * 獲取數據.
     * 
     * @param columnIndex 列索引
     * @return 數據
     */
    public Object getCell(final int columnIndex) {
        Preconditions.checkArgument(columnIndex > 0 && columnIndex < data.length + 1);
        return data[columnIndex - 1];
    }
    
    /**
     * 設置數據.
     *
     * @param columnIndex 列索引
     * @param value 值
     */
    public void setCell(final int columnIndex, final Object value) {
        Preconditions.checkArgument(columnIndex > 0 && columnIndex < data.length + 1);
        data[columnIndex - 1] = value;
    }
}
  • 調用 #load() 方法,將當前結果集的一條行數據加載到內存。

2.2.3 AbstractDecoratorResultSetMerger

AbstractDecoratorResultSetMerger,裝飾結果集歸併抽象類,經過調用其裝飾的歸併對象 #getValue() 方法得到行數據。

public abstract class AbstractDecoratorResultSetMerger implements ResultSetMerger {

    /**
     * 裝飾的歸併對象
     */
    private final ResultSetMerger resultSetMerger;
        
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
        return resultSetMerger.getValue(columnIndex, type);
    }
}

3. OrderByStreamResultSetMerger

OrderByStreamResultSetMerger,基於 Stream 方式排序歸併結果集實現。

3.1 歸併算法

由於各個分片結果集已經排序完成,使用**《歸併算法》**可以充分利用這個優點。

歸併操做(merge),也叫歸併算法,指的是將兩個已經排序的序列合併成一個序列的操做。歸併排序算法依賴歸併操做。

【迭代法】

  1. 申請空間,使其大小爲兩個已經排序序列之和,該空間用來存放合併後的序列
  2. 設定兩個指針,最初位置分別爲兩個已經排序序列的起始位置
  3. 比較兩個指針所指向的元素,選擇相對小的元素放入到合併空間,並移動指針到下一位置
  4. 重複步驟3直到某一指針到達序列尾
  5. 將另外一序列剩下的全部元素直接複製到合併序列尾

從定義上看,是否是超級符合咱們這個場景。😈 此時此刻,你是否是捂着胸口,感嘆:「大學怎麼沒好好學數據結構與算法呢」?反正我是捂着了,都是眼淚。

public class OrderByStreamResultSetMerger extends AbstractStreamResultSetMerger {

    /**
     * 排序列
     */
    @Getter(AccessLevel.NONE)
    private final List<OrderItem> orderByItems;
    /**
     * 排序值對象隊列
     */
    private final Queue<OrderByValue> orderByValuesQueue;
    /**
     * 默認排序類型
     */
    private final OrderType nullOrderType;
    /**
     * 是否第一個 ResultSet 已經調用 #next()
     */
    private boolean isFirstNext;
    
    public OrderByStreamResultSetMerger(final List<ResultSet> resultSets, final List<OrderItem> orderByItems, final OrderType nullOrderType) throws SQLException {
        this.orderByItems = orderByItems;
        this.orderByValuesQueue = new PriorityQueue<>(resultSets.size());
        this.nullOrderType = nullOrderType;
        orderResultSetsToQueue(resultSets);
        isFirstNext = true;
    }
    
    private void orderResultSetsToQueue(final List<ResultSet> resultSets) throws SQLException {
        for (ResultSet each : resultSets) {
            OrderByValue orderByValue = new OrderByValue(each, orderByItems, nullOrderType);
            if (orderByValue.next()) {
                orderByValuesQueue.offer(orderByValue);
            }
        }
        // 設置當前 ResultSet,這樣 #getValue() 能拿到記錄
        setCurrentResultSet(orderByValuesQueue.isEmpty() ? resultSets.get(0) : orderByValuesQueue.peek().getResultSet());
    }
  • 屬性 orderByValuesQueue 使用的隊列實現是優先級隊列( PriorityQueue )。有興趣的同窗能夠看看《JDK源碼研究PriorityQueue》,本文不展開講,不是主角戲份很少。咱們記住幾個方法的用途:

    • #offer():增長元素。增長時,會將該元素和已有元素們按照優先級進行排序
    • #peek():得到優先級第一的元素
    • #pool():得到優先級第一的元素並移除
  • 一個 ResultSet 構建一個 OrderByValue 用於排序,即上文歸併算法提到的**「空間」**。

    public final class OrderByValue implements Comparable<OrderByValue> {
    
        /**
         * 已排序結果集
         */
        @Getter
        private final ResultSet resultSet;
        /**
         * 排序列
         */
        private final List<OrderItem> orderByItems;
        /**
         * 默認排序類型
         */
        private final OrderType nullOrderType;
        /**
         * 排序列對應的值數組
         * 由於一條記錄可能有多個排序列,因此是數組
         */
        private List<Comparable<?>> orderValues;
    
        /**
         * 遍歷下一個結果集遊標.
         * 
         * @return 是否有下一個結果集
         * @throws SQLException SQL異常
         */
        public boolean next() throws SQLException {
            boolean result = resultSet.next();
            orderValues = result ? getOrderValues() : Collections.<Comparable<?>>emptyList();
            return result;
        }
    
        /**
         * 得到 排序列對應的值數組
         *
         * @return 排序列對應的值數組
         * @throws SQLException 當結果集關閉時
         */
        private List<Comparable<?>> getOrderValues() throws SQLException {
            List<Comparable<?>> result = new ArrayList<>(orderByItems.size());
            for (OrderItem each : orderByItems) {
                Object value = resultSet.getObject(each.getIndex());
                Preconditions.checkState(null == value || value instanceof Comparable, "Order by value must implements Comparable");
                result.add((Comparable<?>) value);
            }
            return result;
        }
    
        /**
         * 對比 {@link #orderValues},即二者的第一條記錄
         *
         * @param o 對比 OrderByValue
         * @return -1 0 1
         */
        @Override
        public int compareTo(final OrderByValue o) {
            for (int i = 0; i < orderByItems.size(); i++) {
                OrderItem thisOrderBy = orderByItems.get(i);
                int result = ResultSetUtil.compareTo(orderValues.get(i), o.orderValues.get(i), thisOrderBy.getType(), nullOrderType);
                if (0 != result) {
                    return result;
                }
            }
            return 0;
        }
    }
    • 調用 OrderByValue#next() 方法時,得到其對應結果集排在第一條的記錄,經過 #getOrderValues() 計算該記錄的排序字段值。這樣兩個OrderByValue 經過 #compareTo() 方法能夠比較兩個結果集的第一條記錄。
  • if (orderByValue.next()) { 處,調用 OrderByValue#next() 後,添加到 PriorityQueue。所以,orderByValuesQueue.peek().getResultSet() 可以得到多個 ResultSet 中排在第一的。

3.2 #next()

經過調用 OrderByStreamResultSetMerger#next() 不斷得到當前排在第一的記錄。#next() 每次調用後,實際作的是當前 ResultSet 的替換,以及當前的 ResultSet 的記錄指向下一條。這樣提及來可能比較繞,咱們來看一張圖:

  • 白色向下箭頭:OrderByStreamResultSetMerger 對 ResultSet 的指向。
  • 黑色箭頭:ResultSet 對當前記錄的指向。
  • ps:這塊若是分享的不清晰讓您費勁,十分抱歉。歡迎加我微信(wangwenbin-server)交流下,這樣我也能夠優化表述。
// OrderByStreamResultSetMerger.java
@Override
public boolean next() throws SQLException {
   if (orderByValuesQueue.isEmpty()) {
       return false;
   }
   if (isFirstNext) {
       isFirstNext = false;
       return true;
   }
   // 移除上一次得到的 ResultSet
   OrderByValue firstOrderByValue = orderByValuesQueue.poll();
   // 若是上一次得到的 ResultSet還有下一條記錄,繼續添加到 排序值對象隊列
   if (firstOrderByValue.next()) {
       orderByValuesQueue.offer(firstOrderByValue);
   }
   if (orderByValuesQueue.isEmpty()) {
       return false;
   }
   // 設置當前 ResultSet
   setCurrentResultSet(orderByValuesQueue.peek().getResultSet());
   return true;
}
  • orderByValuesQueue.poll() 移除上一次得到的 ResultSet。爲何不能 #setCurrentResultSet() 就移除呢?若是該 ResultSet 裏面還存在下一條記錄,須要繼續參加排序。而判斷是否有下一條,須要調用 ResultSet#next() 方法,這會致使 ResultSet 指向了下一條記錄。於是 orderByValuesQueue.poll() 調用是後置的。

  • isFirstNext 變量那的判斷看着是否是很「靈異」?由於 #orderResultSetsToQueue() 處設置了第一次的 ResultSet。若是不加這個標記,會致使第一條記錄「不見」了。

  • 經過不斷的 Queue#poll()Queue#offset() 實現排序。巧妙!彷彿 Get 新技能了:

    // 移除上一次得到的 ResultSet
    OrderByValue firstOrderByValue = orderByValuesQueue.poll();
    // 若是上一次得到的 ResultSet還有下一條記錄,繼續添加到 排序值對象隊列
    if (firstOrderByValue.next()) {
      orderByValuesQueue.offer(firstOrderByValue);
    }

在看下,咱們上文 Stream 方式歸併的定義:**將數據遊標與結果集的遊標保持一致,順序的從結果集中一條條的獲取正確的數據。**是否是可以清晰的對上了?!🙂

4. GroupByStreamResultSetMerger

GroupByStreamResultSetMerger,基於 Stream 方式分組歸併結果集實現。 它繼承自 OrderByStreamResultSetMerger,在排序的邏輯上,實現分組功能。實現原理也較爲簡單:

public final class GroupByStreamResultSetMerger extends OrderByStreamResultSetMerger {

    /**
     * 查詢列名與位置映射
     */
    private final Map<String, Integer> labelAndIndexMap;
    /**
     * Select SQL語句對象
     */
    private final SelectStatement selectStatement;
    /**
     * 當前結果記錄
     */
    private final List<Object> currentRow;
    /**
     * 下一條結果記錄 GROUP BY 條件
     */
    private List<?> currentGroupByValues;
    
    public GroupByStreamResultSetMerger(
            final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException {
        super(resultSets, selectStatement.getOrderByItems(), nullOrderType);
        this.labelAndIndexMap = labelAndIndexMap;
        this.selectStatement = selectStatement;
        currentRow = new ArrayList<>(labelAndIndexMap.size());
        // 初始化下一條結果記錄 GROUP BY 條件
        currentGroupByValues = getOrderByValuesQueue().isEmpty() ? Collections.emptyList() : new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
    }
    
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
        return currentRow.get(columnIndex - 1);
    }
    @Override
    public Object getValue(final String columnLabel, final Class<?> type) throws SQLException {
        Preconditions.checkState(labelAndIndexMap.containsKey(columnLabel), String.format("Can't find columnLabel: %s", columnLabel));
        return currentRow.get(labelAndIndexMap.get(columnLabel) - 1);
    }
}
  • currentRow 爲當前結果記錄,使用 #getValue()#getCalendarValue() 方法得到當前結果記錄的查詢列值。

  • currentGroupByValues下一條結果記錄 GROUP BY 條件,經過 GroupByValue 生成:

    public final class GroupByValue {
    
        /**
         * 分組條件值數組
         */
        private final List<?> groupValues;
    
        public GroupByValue(final ResultSet resultSet, final List<OrderItem> groupByItems) throws SQLException {
            groupValues = getGroupByValues(resultSet, groupByItems);
        }
    
        /**
         * 得到分組條件值數組
         * 例如,`GROUP BY user_id, order_status` 返回的某條記錄結果爲 `userId = 1, order_status = 3`,對應的 `groupValues = [1, 3]`
         * @param resultSet 結果集(單分片)
         * @param groupByItems 分組列
         * @return 分組條件值數組
         * @throws SQLException 當結果集關閉
         */
        private List<?> getGroupByValues(final ResultSet resultSet, final List<OrderItem> groupByItems) throws SQLException {
            List<Object> result = new ArrayList<>(groupByItems.size());
            for (OrderItem each : groupByItems) {
                result.add(resultSet.getObject(each.getIndex())); // 從結果集得到每一個分組條件的值
            }
            return result;
        }
    }
  • GroupByStreamResultSetMerger 在建立時,當前結果記錄實際未合併,須要先調用 #next(),在使用 #getValue() 等方法獲取值,這個和 OrderByStreamResultSetMerger 不一樣,多是個 BUG。

4.1 AggregationUnit

AggregationUnit,歸併計算單元接口,有兩個接口方法:

  • #merge():歸併聚合值
  • #getResult():獲取計算結果

一共有三個實現類:

實現都比較易懂,直接點擊連接查看源碼,咱們就不浪費篇幅貼代碼啦。

4.2 #next()

咱們先看看大致的調用流程:

😈 看起來代碼比較多,邏輯其實比較清晰,對照着順序圖順序往下讀便可。

// GroupByStreamResultSetMerger.java
@Override
public boolean next() throws SQLException {
   // 清除當前結果記錄
   currentRow.clear();
   if (getOrderByValuesQueue().isEmpty()) {
       return false;
   }
   //
   if (isFirstNext()) {
       super.next();
   }
   // 順序合併下面相同分組條件的記錄
   if (aggregateCurrentGroupByRowAndNext()) {
       // 生成下一條結果記錄 GROUP BY 條件
       currentGroupByValues = new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
   }
   return true;
}

private boolean aggregateCurrentGroupByRowAndNext() throws SQLException {
   boolean result = false;
   // 生成計算單元
   Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() {
       
       @Override
       public AggregationUnit apply(final AggregationSelectItem input) {
           return AggregationUnitFactory.create(input.getType());
       }
   });
   // 循環順序合併下面相同分組條件的記錄
   while (currentGroupByValues.equals(new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues())) {
       // 歸併聚合值
       aggregate(aggregationUnitMap);
       // 緩存當前記錄到結果記錄
       cacheCurrentRow();
       // 獲取下一條記錄
       result = super.next();
       if (!result) {
           break;
       }
   }
   // 設置當前記錄的聚合字段結果
   setAggregationValueToCurrentRow(aggregationUnitMap);
   return result;
}
    
private void aggregate(final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap) throws SQLException {
   for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
       List<Comparable<?>> values = new ArrayList<>(2);
       if (entry.getKey().getDerivedAggregationSelectItems().isEmpty()) { // SUM/COUNT/MAX/MIN 聚合列
           values.add(getAggregationValue(entry.getKey()));
       } else {
           for (AggregationSelectItem each : entry.getKey().getDerivedAggregationSelectItems()) { // AVG 聚合列
               values.add(getAggregationValue(each));
           }
       }
       entry.getValue().merge(values);
   }
}
    
private void cacheCurrentRow() throws SQLException {
   for (int i = 0; i < getCurrentResultSet().getMetaData().getColumnCount(); i++) {
       currentRow.add(getCurrentResultSet().getObject(i + 1));
   }
}
    
private Comparable<?> getAggregationValue(final AggregationSelectItem aggregationSelectItem) throws SQLException {
   Object result = getCurrentResultSet().getObject(aggregationSelectItem.getIndex());
   Preconditions.checkState(null == result || result instanceof Comparable, "Aggregation value must implements Comparable");
   return (Comparable<?>) result;
}
    
private void setAggregationValueToCurrentRow(final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap) {
   for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
       currentRow.set(entry.getKey().getIndex() - 1, entry.getValue().getResult()); // 獲取計算結果
   }
}

5. GroupByMemoryResultSetMerger

GroupByMemoryResultSetMerger,基於 內存 分組歸併結果集實現。

區別於 GroupByStreamResultSetMerger,其沒法使用每一個分片結果集的有序的特色,只能在內存中合併後,進行整個從新排序。於是,性能和內存都較 GroupByStreamResultSetMerger 會差。

主流程以下:

public final class GroupByMemoryResultSetMerger extends AbstractMemoryResultSetMerger {

    /**
     * Select SQL語句對象
     */
    private final SelectStatement selectStatement;
    /**
     * 默認排序類型
     */
    private final OrderType nullOrderType;
    /**
     * 內存結果集
     */
    private final Iterator<MemoryResultSetRow> memoryResultSetRows;
    
    public GroupByMemoryResultSetMerger(
            final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException {
        super(labelAndIndexMap);
        this.selectStatement = selectStatement;
        this.nullOrderType = nullOrderType;
        memoryResultSetRows = init(resultSets);
    }
    
    private Iterator<MemoryResultSetRow> init(final List<ResultSet> resultSets) throws SQLException {
        Map<GroupByValue, MemoryResultSetRow> dataMap = new HashMap<>(1024); // 分組條件值與內存記錄映射
        Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap = new HashMap<>(1024); // 分組條件值與聚合列映射
        // 遍歷結果集
        for (ResultSet each : resultSets) {
            while (each.next()) {
                // 生成分組條件
                GroupByValue groupByValue = new GroupByValue(each, selectStatement.getGroupByItems());
                // 初始化分組條件到 dataMap、aggregationMap 映射
                initForFirstGroupByValue(each, groupByValue, dataMap, aggregationMap);
                // 歸併聚合值
                aggregate(each, groupByValue, aggregationMap);
            }
        }
        // 設置聚合列結果到內存記錄
        setAggregationValueToMemoryRow(dataMap, aggregationMap);
        // 內存排序
        List<MemoryResultSetRow> result = getMemoryResultSetRows(dataMap);
        // 設置當前 ResultSet,這樣 #getValue() 能拿到記錄
        if (!result.isEmpty()) {
            setCurrentResultSetRow(result.get(0));
        }
        return result.iterator();
    }
}
  • #initForFirstGroupByValue() 初始化分組條件dataMapaggregationMap 映射中,這樣能夠調用 #aggregate() 將聚合值歸併到 aggregationMap 裏的該分組條件。

    private void initForFirstGroupByValue(final ResultSet resultSet, final GroupByValue groupByValue, final Map<GroupByValue, MemoryResultSetRow> dataMap, 
                                          final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) throws SQLException {
        // 初始化分組條件到 dataMap
        if (!dataMap.containsKey(groupByValue)) {
            dataMap.put(groupByValue, new MemoryResultSetRow(resultSet));
        }
        // 初始化分組條件到 aggregationMap
        if (!aggregationMap.containsKey(groupByValue)) {
            Map<AggregationSelectItem, AggregationUnit> map = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() {
    
                @Override
                public AggregationUnit apply(final AggregationSelectItem input) {
                    return AggregationUnitFactory.create(input.getType());
                }
            });
            aggregationMap.put(groupByValue, map);
        }
    }
  • 聚合完每一個分組條件後,將聚合列結果 aggregationMap 合併到 dataMap

    private void setAggregationValueToMemoryRow(final Map<GroupByValue, MemoryResultSetRow> dataMap, final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) {
       for (Entry<GroupByValue, MemoryResultSetRow> entry : dataMap.entrySet()) { // 遍 歷內存記錄
           for (AggregationSelectItem each : selectStatement.getAggregationSelectItems()) { // 遍歷 每一個聚合列
               entry.getValue().setCell(each.getIndex(), aggregationMap.get(entry.getKey()).get(each).getResult());
           }
       }
    }
  • 調用 #getMemoryResultSetRows() 方法對內存記錄進行內存排序

// GroupByMemoryResultSetMerger.java
private List<MemoryResultSetRow> getMemoryResultSetRows(final Map<GroupByValue, MemoryResultSetRow> dataMap) {
   List<MemoryResultSetRow> result = new ArrayList<>(dataMap.values());
   Collections.sort(result, new GroupByRowComparator(selectStatement, nullOrderType)); // 內存排序
   return result;
}

// GroupByRowComparator.java
private int compare(final MemoryResultSetRow o1, final MemoryResultSetRow o2, final List<OrderItem> orderItems) {
   for (OrderItem each : orderItems) {
       Object orderValue1 = o1.getCell(each.getIndex());
       Preconditions.checkState(null == orderValue1 || orderValue1 instanceof Comparable, "Order by value must implements Comparable");
       Object orderValue2 = o2.getCell(each.getIndex());
       Preconditions.checkState(null == orderValue2 || orderValue2 instanceof Comparable, "Order by value must implements Comparable");
       int result = ResultSetUtil.compareTo((Comparable) orderValue1, (Comparable) orderValue2, each.getType(), nullOrderType);
       if (0 != result) {
           return result;
       }
   }
   return 0;
}
  • 總的來講,GROUP BY 內存歸併和咱們平常使用 Map 計算用戶訂單數是比較類似的。

5.1 #next()

@Override
public boolean next() throws SQLException {
   if (memoryResultSetRows.hasNext()) {
       setCurrentResultSetRow(memoryResultSetRows.next());
       return true;
   }
   return false;
}
  • 內存歸併完成後,使用 memoryResultSetRows 不斷得到下一條記錄。

6. IteratorStreamResultSetMerger

IteratorStreamResultSetMerger,基於 Stream 迭代歸併結果集實現。

public final class IteratorStreamResultSetMerger extends AbstractStreamResultSetMerger {

    /**
     * ResultSet 數組迭代器
     */
    private final Iterator<ResultSet> resultSets;

    public IteratorStreamResultSetMerger(final List<ResultSet> resultSets) {
        this.resultSets = resultSets.iterator();
        // 設置當前 ResultSet,這樣 #getValue() 能拿到記錄
        setCurrentResultSet(this.resultSets.next());
    }

    @Override
    public boolean next() throws SQLException {
        // 當前 ResultSet 迭代下一條記錄
        if (getCurrentResultSet().next()) {
            return true;
        }
        if (!resultSets.hasNext()) {
            return false;
        }
        // 得到下一個ResultSet, 設置當前 ResultSet
        setCurrentResultSet(resultSets.next());
        boolean hasNext = getCurrentResultSet().next();
        if (hasNext) {
            return true;
        }
        while (!hasNext && resultSets.hasNext()) {
            setCurrentResultSet(resultSets.next());
            hasNext = getCurrentResultSet().next();
        }
        return hasNext;
    }
}

7. LimitDecoratorResultSetMerger

LimitDecoratorResultSetMerger,基於 Decorator 分頁結果集歸併實現。

public final class LimitDecoratorResultSetMerger extends AbstractDecoratorResultSetMerger {

    /**
     * 分頁條件
     */
    private final Limit limit;
    /**
     * 是否所有記錄都跳過了,即無符合條件記錄
     */
    private final boolean skipAll;
    /**
     * 當前已返回行數
     */
    private int rowNumber;
    
    public LimitDecoratorResultSetMerger(final ResultSetMerger resultSetMerger, final Limit limit) throws SQLException {
        super(resultSetMerger);
        this.limit = limit;
        skipAll = skipOffset();
    }
    
    private boolean skipOffset() throws SQLException {
        // 跳過 skip 記錄
        for (int i = 0; i < limit.getOffsetValue(); i++) {
            if (!getResultSetMerger().next()) {
                return true;
            }
        }
        // 行數
        rowNumber = limit.isRowCountRewriteFlag() ? 0 : limit.getOffsetValue();
        return false;
    }
    
    @Override
    public boolean next() throws SQLException {
        if (skipAll) {
            return false;
        }
        // 得到下一條記錄
        if (limit.getRowCountValue() > -1) {
            return ++rowNumber <= limit.getRowCountValue() && getResultSetMerger().next();
        }
        // 部分db 能夠直 offset,不寫 limit 行數,例如 oracle
        return getResultSetMerger().next();
    }

}
  • LimitDecoratorResultSetMerger 能夠對其餘 ResultSetMerger 進行裝飾,調用其餘 ResultSetMerger 的 #next() 不斷得到下一條記錄。

666. 彩蛋

誒?應該是有蠻多地方解釋的不是很清晰,若是讓您閱讀誤解或是阻塞,很是抱歉。代碼讀起來比較易懂,使用文字來解釋,對錶述能力較差的本身,可能就絞盡腦汁,一臉懵逼。

恩,若是能夠,還煩請把讀起來不太爽的地方告訴我,謝謝。

厚着臉皮,道友,分享一波朋友圈可好?

以下是小禮包,嘿嘿

歸併結果集接口 SQL
OrderByStreamResultSetMerger SELECT * FROM t_order ORDER BY id
GroupByStreamResultSetMerger SELECT uid, AVG(id) FROM t_order GROUP BY uid
GroupByMemoryResultSetMerger SELECT uid FROM t_order GROUP BY id ORDER BY id DESC
IteratorStreamResultSetMerger SELECT * FROM t_order
LimitDecoratorResultSetMerger SELECT * FROM t_order ORDER BY id LIMIT 10
相關文章
相關標籤/搜索