摘要: 原創出處 http://www.iocoder.cn/Sharding-JDBC/result-merger/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!java
本文主要基於 Sharding-JDBC 1.5.0 正式版git
🙂🙂🙂關注**微信公衆號:【芋道源碼】**有福利:github
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
- 認真的源碼交流微信羣。
本文分享查詢結果歸併的源碼實現。算法
正如前文《SQL 執行》提到的**「分表分庫,須要執行的 SQL 數量從單條變成了多條」,多個SQL執行**結果必然須要進行合併,例如:sql
SELECT * FROM t_order ORDER BY create_time
在各分片排序完後,Sharding-JDBC 獲取到結果後,仍然須要再進一步排序。目前有 分頁、分組、排序、聚合列、迭代 五種場景須要作進一步處理。固然,若是單分片SQL執行結果是無需合併的。在《SQL 執行》不知不覺已經分享了插入、更新、刪除操做的結果合併,因此下面咱們一塊兒看看查詢結果歸併的實現。數據庫
Sharding-JDBC 正在收集使用公司名單:傳送門。
🙂 你的登記,會讓更多人蔘與和使用 Sharding-JDBC。傳送門
Sharding-JDBC 也會所以,可以覆蓋更多的業務場景。傳送門
登記吧,騷年!傳送門數組
MergeEngine,分片結果集歸併引擎。緩存
// MergeEngine.java /** * 數據庫類型 */ private final DatabaseType databaseType; /** * 結果集集合 */ private final List<ResultSet> resultSets; /** * Select SQL語句對象 */ private final SelectStatement selectStatement; /** * 查詢列名與位置映射 */ private final Map<String, Integer> columnLabelIndexMap; public MergeEngine(final DatabaseType databaseType, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException { this.databaseType = databaseType; this.resultSets = resultSets; this.selectStatement = selectStatement; // 得到 查詢列名與位置映射 columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0)); } /** * 得到 查詢列名與位置映射 * * @param resultSet 結果集 * @return 查詢列名與位置映射 * @throws SQLException 當結果集已經關閉 */ private Map<String, Integer> getColumnLabelIndexMap(final ResultSet resultSet) throws SQLException { ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); // 元數據(包含查詢列信息) Map<String, Integer> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { result.put(SQLUtil.getExactlyValue(resultSetMetaData.getColumnLabel(i)), i); } return result; }
resultSets
結果集集合,並根據其得到 columnLabelIndexMap
查詢列名與位置映射。經過 columnLabelIndexMap
,能夠很方便的使用查詢列名得到在返回結果記錄列( header )的第幾列。MergeEngine 的 #merge()
方法做爲入口提供查詢結果歸併功能。微信
/** * 合併結果集. * * @return 歸併完畢後的結果集 * @throws SQLException SQL異常 */ public ResultSetMerger merge() throws SQLException { selectStatement.setIndexForItems(columnLabelIndexMap); return decorate(build()); }
#merge()
主體邏輯就兩行代碼,設置查詢列位置信息,並返回合適的歸併結果集接口( ResultSetMerger ) 實現。// SelectStatement.java /** * 爲選擇項設置索引. * * @param columnLabelIndexMap 列標籤索引字典 */ public void setIndexForItems(final Map<String, Integer> columnLabelIndexMap) { setIndexForAggregationItem(columnLabelIndexMap); setIndexForOrderItem(columnLabelIndexMap, orderByItems); setIndexForOrderItem(columnLabelIndexMap, groupByItems); }
部分查詢列是通過推到出來,在 SQL解析 過程當中,未得到到查詢列位置,須要經過該方法進行初始化。對這塊不瞭解的同窗,回頭能夠看下《SQL 解析(三)之查詢SQL》。🙂 如今不用回頭,皇冠會掉。數據結構
#setIndexForAggregationItem()
處理 AVG聚合計算列 推導出其對應的 SUM/COUNT 聚合計算列的位置:
private void setIndexForAggregationItem(final Map<String, Integer> columnLabelIndexMap) { for (AggregationSelectItem each : getAggregationSelectItems()) { Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s, please add alias for aggregate selections", each)); each.setIndex(columnLabelIndexMap.get(each.getColumnLabel())); for (AggregationSelectItem derived : each.getDerivedAggregationSelectItems()) { Preconditions.checkState(columnLabelIndexMap.containsKey(derived.getColumnLabel()), String.format("Can't find index: %s", derived)); derived.setIndex(columnLabelIndexMap.get(derived.getColumnLabel())); } } }
#setIndexForOrderItem()
處理 ORDER BY / GROUP BY 列不在查詢列 推導出的查詢列的位置:
private void setIndexForOrderItem(final Map<String, Integer> columnLabelIndexMap, final List<OrderItem> orderItems) { for (OrderItem each : orderItems) { if (-1 != each.getIndex()) { continue; } Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s", each)); if (columnLabelIndexMap.containsKey(each.getColumnLabel())) { each.setIndex(columnLabelIndexMap.get(each.getColumnLabel())); } } }
ResultSetMerger,歸併結果集接口。
咱們先來看看總體的類結構關係:
從 功能 上分紅四種:
從 實現方式 上分紅三種:
何時該用什麼實現方式?
// MergeEngine.java /** * 合併結果集. * * @return 歸併完畢後的結果集 * @throws SQLException SQL異常 */ public ResultSetMerger merge() throws SQLException { selectStatement.setIndexForItems(columnLabelIndexMap); return decorate(build()); } private ResultSetMerger build() throws SQLException { if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) { // 分組 或 聚合列 if (selectStatement.isSameGroupByAndOrderByItems()) { return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } else { return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } } if (!selectStatement.getOrderByItems().isEmpty()) { return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType()); } return new IteratorStreamResultSetMerger(resultSets); } private ResultSetMerger decorate(final ResultSetMerger resultSetMerger) throws SQLException { ResultSetMerger result = resultSetMerger; if (null != selectStatement.getLimit()) { result = new LimitDecoratorResultSetMerger(result, selectStatement.getLimit()); } return result; }
AbstractStreamResultSetMerger,流式歸併結果集抽象類,提供從當前結果集得到行數據。
public abstract class AbstractStreamResultSetMerger implements ResultSetMerger { /** * 當前結果集 */ private ResultSet currentResultSet; protected ResultSet getCurrentResultSet() throws SQLException { if (null == currentResultSet) { throw new SQLException("Current ResultSet is null, ResultSet perhaps end of next."); } return currentResultSet; } @Override public Object getValue(final int columnIndex, final Class<?> type) throws SQLException { if (Object.class == type) { return getCurrentResultSet().getObject(columnIndex); } if (int.class == type) { return getCurrentResultSet().getInt(columnIndex); } if (String.class == type) { return getCurrentResultSet().getString(columnIndex); } // .... 省略其餘數據類型讀取相似代碼 return getCurrentResultSet().getObject(columnIndex); } }
AbstractMemoryResultSetMerger,內存歸併結果集抽象類,提供從內存數據行對象( MemoryResultSetRow ) 得到行數據。
public abstract class AbstractMemoryResultSetMerger implements ResultSetMerger { private final Map<String, Integer> labelAndIndexMap; /** * 內存數據行對象 */ @Setter private MemoryResultSetRow currentResultSetRow; @Override public Object getValue(final int columnIndex, final Class<?> type) throws SQLException { if (Blob.class == type || Clob.class == type || Reader.class == type || InputStream.class == type || SQLXML.class == type) { throw new SQLFeatureNotSupportedException(); } return currentResultSetRow.getCell(columnIndex); } }
public class MemoryResultSetRow { /** * 行數據 */ private final Object[] data; public MemoryResultSetRow(final ResultSet resultSet) throws SQLException { data = load(resultSet); } /** * 加載 ResultSet 當前行數據到內存 * @param resultSet 結果集 * @return 行數據 * @throws SQLException 當結果集關閉 */ private Object[] load(final ResultSet resultSet) throws SQLException { int columnCount = resultSet.getMetaData().getColumnCount(); Object[] result = new Object[columnCount]; for (int i = 0; i < columnCount; i++) { result[i] = resultSet.getObject(i + 1); } return result; } /** * 獲取數據. * * @param columnIndex 列索引 * @return 數據 */ public Object getCell(final int columnIndex) { Preconditions.checkArgument(columnIndex > 0 && columnIndex < data.length + 1); return data[columnIndex - 1]; } /** * 設置數據. * * @param columnIndex 列索引 * @param value 值 */ public void setCell(final int columnIndex, final Object value) { Preconditions.checkArgument(columnIndex > 0 && columnIndex < data.length + 1); data[columnIndex - 1] = value; } }
#load()
方法,將當前結果集的一條行數據加載到內存。AbstractDecoratorResultSetMerger,裝飾結果集歸併抽象類,經過調用其裝飾的歸併對象 #getValue()
方法得到行數據。
public abstract class AbstractDecoratorResultSetMerger implements ResultSetMerger { /** * 裝飾的歸併對象 */ private final ResultSetMerger resultSetMerger; @Override public Object getValue(final int columnIndex, final Class<?> type) throws SQLException { return resultSetMerger.getValue(columnIndex, type); } }
OrderByStreamResultSetMerger,基於 Stream 方式排序歸併結果集實現。
由於各個分片結果集已經排序完成,使用**《歸併算法》**可以充分利用這個優點。
歸併操做(merge),也叫歸併算法,指的是將兩個已經排序的序列合併成一個序列的操做。歸併排序算法依賴歸併操做。
【迭代法】
- 申請空間,使其大小爲兩個已經排序序列之和,該空間用來存放合併後的序列
- 設定兩個指針,最初位置分別爲兩個已經排序序列的起始位置
- 比較兩個指針所指向的元素,選擇相對小的元素放入到合併空間,並移動指針到下一位置
- 重複步驟3直到某一指針到達序列尾
- 將另外一序列剩下的全部元素直接複製到合併序列尾
從定義上看,是否是超級符合咱們這個場景。😈 此時此刻,你是否是捂着胸口,感嘆:「大學怎麼沒好好學數據結構與算法呢」?反正我是捂着了,都是眼淚。
public class OrderByStreamResultSetMerger extends AbstractStreamResultSetMerger { /** * 排序列 */ @Getter(AccessLevel.NONE) private final List<OrderItem> orderByItems; /** * 排序值對象隊列 */ private final Queue<OrderByValue> orderByValuesQueue; /** * 默認排序類型 */ private final OrderType nullOrderType; /** * 是否第一個 ResultSet 已經調用 #next() */ private boolean isFirstNext; public OrderByStreamResultSetMerger(final List<ResultSet> resultSets, final List<OrderItem> orderByItems, final OrderType nullOrderType) throws SQLException { this.orderByItems = orderByItems; this.orderByValuesQueue = new PriorityQueue<>(resultSets.size()); this.nullOrderType = nullOrderType; orderResultSetsToQueue(resultSets); isFirstNext = true; } private void orderResultSetsToQueue(final List<ResultSet> resultSets) throws SQLException { for (ResultSet each : resultSets) { OrderByValue orderByValue = new OrderByValue(each, orderByItems, nullOrderType); if (orderByValue.next()) { orderByValuesQueue.offer(orderByValue); } } // 設置當前 ResultSet,這樣 #getValue() 能拿到記錄 setCurrentResultSet(orderByValuesQueue.isEmpty() ? resultSets.get(0) : orderByValuesQueue.peek().getResultSet()); }
屬性 orderByValuesQueue
使用的隊列實現是優先級隊列( PriorityQueue )。有興趣的同窗能夠看看《JDK源碼研究PriorityQueue》,本文不展開講,不是主角戲份很少。咱們記住幾個方法的用途:
#offer()
:增長元素。增長時,會將該元素和已有元素們按照優先級進行排序#peek()
:得到優先級第一的元素#pool()
:得到優先級第一的元素並移除一個 ResultSet 構建一個 OrderByValue 用於排序,即上文歸併算法提到的**「空間」**。
public final class OrderByValue implements Comparable<OrderByValue> { /** * 已排序結果集 */ @Getter private final ResultSet resultSet; /** * 排序列 */ private final List<OrderItem> orderByItems; /** * 默認排序類型 */ private final OrderType nullOrderType; /** * 排序列對應的值數組 * 由於一條記錄可能有多個排序列,因此是數組 */ private List<Comparable<?>> orderValues; /** * 遍歷下一個結果集遊標. * * @return 是否有下一個結果集 * @throws SQLException SQL異常 */ public boolean next() throws SQLException { boolean result = resultSet.next(); orderValues = result ? getOrderValues() : Collections.<Comparable<?>>emptyList(); return result; } /** * 得到 排序列對應的值數組 * * @return 排序列對應的值數組 * @throws SQLException 當結果集關閉時 */ private List<Comparable<?>> getOrderValues() throws SQLException { List<Comparable<?>> result = new ArrayList<>(orderByItems.size()); for (OrderItem each : orderByItems) { Object value = resultSet.getObject(each.getIndex()); Preconditions.checkState(null == value || value instanceof Comparable, "Order by value must implements Comparable"); result.add((Comparable<?>) value); } return result; } /** * 對比 {@link #orderValues},即二者的第一條記錄 * * @param o 對比 OrderByValue * @return -1 0 1 */ @Override public int compareTo(final OrderByValue o) { for (int i = 0; i < orderByItems.size(); i++) { OrderItem thisOrderBy = orderByItems.get(i); int result = ResultSetUtil.compareTo(orderValues.get(i), o.orderValues.get(i), thisOrderBy.getType(), nullOrderType); if (0 != result) { return result; } } return 0; } }
OrderByValue#next()
方法時,得到其對應結果集排在第一條的記錄,經過 #getOrderValues()
計算該記錄的排序字段值。這樣兩個OrderByValue 經過 #compareTo()
方法能夠比較兩個結果集的第一條記錄。if (orderByValue.next()) {
處,調用 OrderByValue#next()
後,添加到 PriorityQueue。所以,orderByValuesQueue.peek().getResultSet()
可以得到多個 ResultSet 中排在第一的。
經過調用 OrderByStreamResultSetMerger#next()
不斷得到當前排在第一的記錄。#next()
每次調用後,實際作的是當前 ResultSet 的替換,以及當前的 ResultSet 的記錄指向下一條。這樣提及來可能比較繞,咱們來看一張圖:
// OrderByStreamResultSetMerger.java @Override public boolean next() throws SQLException { if (orderByValuesQueue.isEmpty()) { return false; } if (isFirstNext) { isFirstNext = false; return true; } // 移除上一次得到的 ResultSet OrderByValue firstOrderByValue = orderByValuesQueue.poll(); // 若是上一次得到的 ResultSet還有下一條記錄,繼續添加到 排序值對象隊列 if (firstOrderByValue.next()) { orderByValuesQueue.offer(firstOrderByValue); } if (orderByValuesQueue.isEmpty()) { return false; } // 設置當前 ResultSet setCurrentResultSet(orderByValuesQueue.peek().getResultSet()); return true; }
orderByValuesQueue.poll()
移除上一次得到的 ResultSet。爲何不能 #setCurrentResultSet()
就移除呢?若是該 ResultSet 裏面還存在下一條記錄,須要繼續參加排序。而判斷是否有下一條,須要調用 ResultSet#next()
方法,這會致使 ResultSet 指向了下一條記錄。於是 orderByValuesQueue.poll()
調用是後置的。
isFirstNext
變量那的判斷看着是否是很「靈異」?由於 #orderResultSetsToQueue()
處設置了第一次的 ResultSet。若是不加這個標記,會致使第一條記錄「不見」了。
經過不斷的 Queue#poll()
、Queue#offset()
實現排序。巧妙!彷彿 Get 新技能了:
// 移除上一次得到的 ResultSet OrderByValue firstOrderByValue = orderByValuesQueue.poll(); // 若是上一次得到的 ResultSet還有下一條記錄,繼續添加到 排序值對象隊列 if (firstOrderByValue.next()) { orderByValuesQueue.offer(firstOrderByValue); }
在看下,咱們上文 Stream 方式歸併的定義:**將數據遊標與結果集的遊標保持一致,順序的從結果集中一條條的獲取正確的數據。**是否是可以清晰的對上了?!🙂
GroupByStreamResultSetMerger,基於 Stream 方式分組歸併結果集實現。 它繼承自 OrderByStreamResultSetMerger,在排序的邏輯上,實現分組功能。實現原理也較爲簡單:
public final class GroupByStreamResultSetMerger extends OrderByStreamResultSetMerger { /** * 查詢列名與位置映射 */ private final Map<String, Integer> labelAndIndexMap; /** * Select SQL語句對象 */ private final SelectStatement selectStatement; /** * 當前結果記錄 */ private final List<Object> currentRow; /** * 下一條結果記錄 GROUP BY 條件 */ private List<?> currentGroupByValues; public GroupByStreamResultSetMerger( final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException { super(resultSets, selectStatement.getOrderByItems(), nullOrderType); this.labelAndIndexMap = labelAndIndexMap; this.selectStatement = selectStatement; currentRow = new ArrayList<>(labelAndIndexMap.size()); // 初始化下一條結果記錄 GROUP BY 條件 currentGroupByValues = getOrderByValuesQueue().isEmpty() ? Collections.emptyList() : new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues(); } @Override public Object getValue(final int columnIndex, final Class<?> type) throws SQLException { return currentRow.get(columnIndex - 1); } @Override public Object getValue(final String columnLabel, final Class<?> type) throws SQLException { Preconditions.checkState(labelAndIndexMap.containsKey(columnLabel), String.format("Can't find columnLabel: %s", columnLabel)); return currentRow.get(labelAndIndexMap.get(columnLabel) - 1); } }
currentRow
爲當前結果記錄,使用 #getValue()
、#getCalendarValue()
方法得到當前結果記錄的查詢列值。
currentGroupByValues
爲下一條結果記錄 GROUP BY 條件,經過 GroupByValue 生成:
public final class GroupByValue { /** * 分組條件值數組 */ private final List<?> groupValues; public GroupByValue(final ResultSet resultSet, final List<OrderItem> groupByItems) throws SQLException { groupValues = getGroupByValues(resultSet, groupByItems); } /** * 得到分組條件值數組 * 例如,`GROUP BY user_id, order_status` 返回的某條記錄結果爲 `userId = 1, order_status = 3`,對應的 `groupValues = [1, 3]` * @param resultSet 結果集(單分片) * @param groupByItems 分組列 * @return 分組條件值數組 * @throws SQLException 當結果集關閉 */ private List<?> getGroupByValues(final ResultSet resultSet, final List<OrderItem> groupByItems) throws SQLException { List<Object> result = new ArrayList<>(groupByItems.size()); for (OrderItem each : groupByItems) { result.add(resultSet.getObject(each.getIndex())); // 從結果集得到每一個分組條件的值 } return result; } }
GroupByStreamResultSetMerger 在建立時,當前結果記錄實際未合併,須要先調用 #next()
,在使用 #getValue()
等方法獲取值,這個和 OrderByStreamResultSetMerger 不一樣,多是個 BUG。
AggregationUnit,歸併計算單元接口,有兩個接口方法:
#merge()
:歸併聚合值#getResult()
:獲取計算結果一共有三個實現類:
實現都比較易懂,直接點擊連接查看源碼,咱們就不浪費篇幅貼代碼啦。
咱們先看看大致的調用流程:
😈 看起來代碼比較多,邏輯其實比較清晰,對照着順序圖順序往下讀便可。
// GroupByStreamResultSetMerger.java @Override public boolean next() throws SQLException { // 清除當前結果記錄 currentRow.clear(); if (getOrderByValuesQueue().isEmpty()) { return false; } // if (isFirstNext()) { super.next(); } // 順序合併下面相同分組條件的記錄 if (aggregateCurrentGroupByRowAndNext()) { // 生成下一條結果記錄 GROUP BY 條件 currentGroupByValues = new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues(); } return true; } private boolean aggregateCurrentGroupByRowAndNext() throws SQLException { boolean result = false; // 生成計算單元 Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() { @Override public AggregationUnit apply(final AggregationSelectItem input) { return AggregationUnitFactory.create(input.getType()); } }); // 循環順序合併下面相同分組條件的記錄 while (currentGroupByValues.equals(new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues())) { // 歸併聚合值 aggregate(aggregationUnitMap); // 緩存當前記錄到結果記錄 cacheCurrentRow(); // 獲取下一條記錄 result = super.next(); if (!result) { break; } } // 設置當前記錄的聚合字段結果 setAggregationValueToCurrentRow(aggregationUnitMap); return result; } private void aggregate(final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap) throws SQLException { for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) { List<Comparable<?>> values = new ArrayList<>(2); if (entry.getKey().getDerivedAggregationSelectItems().isEmpty()) { // SUM/COUNT/MAX/MIN 聚合列 values.add(getAggregationValue(entry.getKey())); } else { for (AggregationSelectItem each : entry.getKey().getDerivedAggregationSelectItems()) { // AVG 聚合列 values.add(getAggregationValue(each)); } } entry.getValue().merge(values); } } private void cacheCurrentRow() throws SQLException { for (int i = 0; i < getCurrentResultSet().getMetaData().getColumnCount(); i++) { currentRow.add(getCurrentResultSet().getObject(i + 1)); } } private Comparable<?> getAggregationValue(final AggregationSelectItem aggregationSelectItem) throws SQLException { Object result = getCurrentResultSet().getObject(aggregationSelectItem.getIndex()); Preconditions.checkState(null == result || result instanceof Comparable, "Aggregation value must implements Comparable"); return (Comparable<?>) result; } private void setAggregationValueToCurrentRow(final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap) { for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) { currentRow.set(entry.getKey().getIndex() - 1, entry.getValue().getResult()); // 獲取計算結果 } }
GroupByMemoryResultSetMerger,基於 內存 分組歸併結果集實現。
區別於 GroupByStreamResultSetMerger,其沒法使用每一個分片結果集的有序的特色,只能在內存中合併後,進行整個從新排序。於是,性能和內存都較 GroupByStreamResultSetMerger 會差。
主流程以下:
public final class GroupByMemoryResultSetMerger extends AbstractMemoryResultSetMerger { /** * Select SQL語句對象 */ private final SelectStatement selectStatement; /** * 默認排序類型 */ private final OrderType nullOrderType; /** * 內存結果集 */ private final Iterator<MemoryResultSetRow> memoryResultSetRows; public GroupByMemoryResultSetMerger( final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException { super(labelAndIndexMap); this.selectStatement = selectStatement; this.nullOrderType = nullOrderType; memoryResultSetRows = init(resultSets); } private Iterator<MemoryResultSetRow> init(final List<ResultSet> resultSets) throws SQLException { Map<GroupByValue, MemoryResultSetRow> dataMap = new HashMap<>(1024); // 分組條件值與內存記錄映射 Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap = new HashMap<>(1024); // 分組條件值與聚合列映射 // 遍歷結果集 for (ResultSet each : resultSets) { while (each.next()) { // 生成分組條件 GroupByValue groupByValue = new GroupByValue(each, selectStatement.getGroupByItems()); // 初始化分組條件到 dataMap、aggregationMap 映射 initForFirstGroupByValue(each, groupByValue, dataMap, aggregationMap); // 歸併聚合值 aggregate(each, groupByValue, aggregationMap); } } // 設置聚合列結果到內存記錄 setAggregationValueToMemoryRow(dataMap, aggregationMap); // 內存排序 List<MemoryResultSetRow> result = getMemoryResultSetRows(dataMap); // 設置當前 ResultSet,這樣 #getValue() 能拿到記錄 if (!result.isEmpty()) { setCurrentResultSetRow(result.get(0)); } return result.iterator(); } }
#initForFirstGroupByValue()
初始化分組條件到 dataMap
,aggregationMap
映射中,這樣能夠調用 #aggregate()
將聚合值歸併到 aggregationMap
裏的該分組條件。
private void initForFirstGroupByValue(final ResultSet resultSet, final GroupByValue groupByValue, final Map<GroupByValue, MemoryResultSetRow> dataMap, final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) throws SQLException { // 初始化分組條件到 dataMap if (!dataMap.containsKey(groupByValue)) { dataMap.put(groupByValue, new MemoryResultSetRow(resultSet)); } // 初始化分組條件到 aggregationMap if (!aggregationMap.containsKey(groupByValue)) { Map<AggregationSelectItem, AggregationUnit> map = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() { @Override public AggregationUnit apply(final AggregationSelectItem input) { return AggregationUnitFactory.create(input.getType()); } }); aggregationMap.put(groupByValue, map); } }
聚合完每一個分組條件後,將聚合列結果 aggregationMap
合併到 dataMap
。
private void setAggregationValueToMemoryRow(final Map<GroupByValue, MemoryResultSetRow> dataMap, final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) { for (Entry<GroupByValue, MemoryResultSetRow> entry : dataMap.entrySet()) { // 遍 歷內存記錄 for (AggregationSelectItem each : selectStatement.getAggregationSelectItems()) { // 遍歷 每一個聚合列 entry.getValue().setCell(each.getIndex(), aggregationMap.get(entry.getKey()).get(each).getResult()); } } }
調用 #getMemoryResultSetRows()
方法對內存記錄進行內存排序。
// GroupByMemoryResultSetMerger.java private List<MemoryResultSetRow> getMemoryResultSetRows(final Map<GroupByValue, MemoryResultSetRow> dataMap) { List<MemoryResultSetRow> result = new ArrayList<>(dataMap.values()); Collections.sort(result, new GroupByRowComparator(selectStatement, nullOrderType)); // 內存排序 return result; } // GroupByRowComparator.java private int compare(final MemoryResultSetRow o1, final MemoryResultSetRow o2, final List<OrderItem> orderItems) { for (OrderItem each : orderItems) { Object orderValue1 = o1.getCell(each.getIndex()); Preconditions.checkState(null == orderValue1 || orderValue1 instanceof Comparable, "Order by value must implements Comparable"); Object orderValue2 = o2.getCell(each.getIndex()); Preconditions.checkState(null == orderValue2 || orderValue2 instanceof Comparable, "Order by value must implements Comparable"); int result = ResultSetUtil.compareTo((Comparable) orderValue1, (Comparable) orderValue2, each.getType(), nullOrderType); if (0 != result) { return result; } } return 0; }
@Override public boolean next() throws SQLException { if (memoryResultSetRows.hasNext()) { setCurrentResultSetRow(memoryResultSetRows.next()); return true; } return false; }
memoryResultSetRows
不斷得到下一條記錄。IteratorStreamResultSetMerger,基於 Stream 迭代歸併結果集實現。
public final class IteratorStreamResultSetMerger extends AbstractStreamResultSetMerger { /** * ResultSet 數組迭代器 */ private final Iterator<ResultSet> resultSets; public IteratorStreamResultSetMerger(final List<ResultSet> resultSets) { this.resultSets = resultSets.iterator(); // 設置當前 ResultSet,這樣 #getValue() 能拿到記錄 setCurrentResultSet(this.resultSets.next()); } @Override public boolean next() throws SQLException { // 當前 ResultSet 迭代下一條記錄 if (getCurrentResultSet().next()) { return true; } if (!resultSets.hasNext()) { return false; } // 得到下一個ResultSet, 設置當前 ResultSet setCurrentResultSet(resultSets.next()); boolean hasNext = getCurrentResultSet().next(); if (hasNext) { return true; } while (!hasNext && resultSets.hasNext()) { setCurrentResultSet(resultSets.next()); hasNext = getCurrentResultSet().next(); } return hasNext; } }
LimitDecoratorResultSetMerger,基於 Decorator 分頁結果集歸併實現。
public final class LimitDecoratorResultSetMerger extends AbstractDecoratorResultSetMerger { /** * 分頁條件 */ private final Limit limit; /** * 是否所有記錄都跳過了,即無符合條件記錄 */ private final boolean skipAll; /** * 當前已返回行數 */ private int rowNumber; public LimitDecoratorResultSetMerger(final ResultSetMerger resultSetMerger, final Limit limit) throws SQLException { super(resultSetMerger); this.limit = limit; skipAll = skipOffset(); } private boolean skipOffset() throws SQLException { // 跳過 skip 記錄 for (int i = 0; i < limit.getOffsetValue(); i++) { if (!getResultSetMerger().next()) { return true; } } // 行數 rowNumber = limit.isRowCountRewriteFlag() ? 0 : limit.getOffsetValue(); return false; } @Override public boolean next() throws SQLException { if (skipAll) { return false; } // 得到下一條記錄 if (limit.getRowCountValue() > -1) { return ++rowNumber <= limit.getRowCountValue() && getResultSetMerger().next(); } // 部分db 能夠直 offset,不寫 limit 行數,例如 oracle return getResultSetMerger().next(); } }
#next()
不斷得到下一條記錄。誒?應該是有蠻多地方解釋的不是很清晰,若是讓您閱讀誤解或是阻塞,很是抱歉。代碼讀起來比較易懂,使用文字來解釋,對錶述能力較差的本身,可能就絞盡腦汁,一臉懵逼。
恩,若是能夠,還煩請把讀起來不太爽的地方告訴我,謝謝。
厚着臉皮,道友,分享一波朋友圈可好?
以下是小禮包,嘿嘿
歸併結果集接口 | SQL |
---|---|
OrderByStreamResultSetMerger | SELECT * FROM t_order ORDER BY id |
GroupByStreamResultSetMerger | SELECT uid, AVG(id) FROM t_order GROUP BY uid |
GroupByMemoryResultSetMerger | SELECT uid FROM t_order GROUP BY id ORDER BY id DESC |
IteratorStreamResultSetMerger | SELECT * FROM t_order |
LimitDecoratorResultSetMerger | SELECT * FROM t_order ORDER BY id LIMIT 10 |