咱們須要查看數據的統計量時,均值是最重要的特徵之一。前端
對於海量數據,這類簡單的聚合ES能夠作到秒級別返回。聚合是ES的特點功能。設計模式
那麼ES是如何實現這一功能的呢? ide
咱們知道,ES的數據存儲在各個節點中, 因此ES的實現AvgAggregation時基本思路就是先統計各個節點,而後彙總。函數
先了解ES是如何統計單個節點: 參考AvgAggregatoroop
@Override public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { if (valuesSource == null) { return LeafBucketCollector.NO_OP_COLLECTOR; } final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { counts = bigArrays.grow(counts, bucket + 1); sums = bigArrays.grow(sums, bucket + 1); values.setDocument(doc); final int valueCount = values.count(); counts.increment(bucket, valueCount); double sum = 0; for (int i = 0; i < valueCount; i++) { sum += values.valueAt(i); } sums.increment(bucket, sum); } }; }
即實現Collector類的collect()方法。而後經過doc_values
機制獲取文檔相關字段的值,分別匯入counts和sums兩個變量中。線程
收集完成counts和sums事後,就須要彙總各個節點的值, 這在搜索的第二階段。 翻譯
從第一階段到第二階段,整個鏈路以下:
s1: 前端請求發送到集羣某一節點的TransportSearchAction.doExecute()
方法中。設計
switch(searchRequest.searchType()) { ..... case QUERY_THEN_FETCH: searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; ...... } searchAsyncAction.start();
見到start()方法,我覺得這個是另啓一個線程,後面發現原來不是的。 這個start()方法把整個查詢過程分爲兩個階段:code
階段一:
performFirstPhase(), 即把請求分發到各個節點,而後記錄節點處理的結果。若是返回的分片是最後一個分片,則轉入階段二。orm
階段二:
performFirstPhase() -> onFirstPhaseResult() -> innerMoveToSecondPhase() -> moveToSecondPhase() 。這裏利用了模板設計模式。在階段二中,會再次向各個節點發起請求,經過docId獲取文檔內容。
s2: 對於聚合而言, 階段二最重要的鏈路是moveToSecondPhase() -> executeFetch() -> finishHim() -> searchPhaseController.merge() , merge()中包含了以下的業務邏輯: 合併hits, 合併suggest, 合併addAggregation 等。 這裏咱們關注聚合。
聚合的入口方法是InternalAggregations.reduce()
, 若是熟悉hadoop, reduce方法的執行邏輯看這個名字也能理解一部分。reduce的中文翻譯「概括」,挺生動形象的。整個鏈路的入口爲InternalAvg.doReduce()
。
@Override public InternalAvg doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) { long count = 0; double sum = 0; for (InternalAggregation aggregation : aggregations) { count += ((InternalAvg) aggregation).count; sum += ((InternalAvg) aggregation).sum; } return new InternalAvg(getName(), sum, count, valueFormatter, pipelineAggregators(), getMetaData()); }
其邏輯至關簡單,count相加, sum相加。獲取最終的結果就是
public double getValue() { return sum / count; }
上面講述了ES分發會彙總的關鍵節點,那麼分發到各個節點的業務邏輯是怎樣的呢?
首先定位入口:
class SearchQueryTransportHandler extends TransportRequestHandler<ShardSearchTransportRequest> { @Override public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception { QuerySearchResultProvider result = searchService.executeQueryPhase(request); channel.sendResponse(result); } }
而後定位到QueryPhrase.execute()
, 在QueryPhrase這個階段,主要作的事情以下:
aggregationPhase.preProcess(searchContext)
: 解析ES的語法,生成Collector.execute
: 在調用Lucene的接口查詢數據前,組合各個Collecotr, collector = MultiCollector.wrap(subCollectors);
而後查詢Lucene索引。對於AvgAggregator, 其關鍵邏輯是:
@Override public void collect(int doc, long bucket) throws IOException { counts = bigArrays.grow(counts, bucket + 1); sums = bigArrays.grow(sums, bucket + 1); values.setDocument(doc); final int valueCount = values.count(); counts.increment(bucket, valueCount); double sum = 0; for (int i = 0; i < valueCount; i++) { sum += values.valueAt(i); } sums.increment(bucket, sum); }
這個已是第二次出現了, 它的功能就是收集每一個命中查詢的doc相關信息。 這裏獲取每一個docId對應的value,是基於doc_value的正向索引。
以上就是整個Avg Aggregation的實現流程。 經過源碼,能夠確認, AvgAggregation是精確可信的。 還有幾個聚合函數,其思路跟AvgAggregation是一致的,就不細說了,他們分別是: Max, Min, Sum, ValueCount, Stats 。。。