Mongo實時聚合千萬文檔數據

1.前言

大數據的聚合分析在企業中很是有用,有過大數據開發經驗的人都知道ES、Mongo都提供了專門的聚合方案來解決這個問題。可是大量數據的實時聚合一直是業務實現上的痛點,ES、Mongo自然對分佈式友好,每每將海量數據存儲到不一樣的分片上; Go語言天生爲並行而生,數據聚合每每能夠將數據分塊計算,本節結合Go語言的並行計算特性實現1秒聚合千萬mongo文檔數據。筆者對大數據沒有深刻的研究,但願有經驗的讀者可以提出批評和更多建議。本文託管github的源碼地址是: mongo千萬文檔高效聚合javascript

2.mongo數據庫經常使用聚合方法

mongo沒有像mysql⼀樣的範式約束,存儲的能夠是複雜類型,⽐如數組、對象等mysql不善於處理的⽂檔型結構,與此同時聚合的操做也⽐mysql複雜不少。html

mongo提供了三種⽅式完成⽂檔數據聚合操做,本節來總結⼀下三種⽅式的區別:java

  • 聚合框架(aggregate pipeline)
  • 聚合計算模型(MapReduce)
  • 單獨的聚合命令(group、distinct、count)

2.1 單獨的聚合命令

單獨的聚合命令⽐aggregate性能低,⽐Mapreduce靈活度低;使⽤起來簡單。mysql

  • group: 可⽤於⼩數據量的⽂檔聚合運算,⽤於提供⽐count、distinct更豐富的統計需求,可使⽤js函數控制統 計邏輯。

在2.2版本以前,group操做最多隻能返回10000條分組記錄,可是從2.2版本以後到2.4版本,mongodb作了優化,可以⽀持返回20000條分組記錄返回,若是分組記錄的條數⼤於20000條,那麼可能你就須要其餘⽅式進⾏統計了,⽐如聚合管道或者MapReducegit

  • count: db.collection.count() 等同於 db.collection.find().count(), 不能適⽤於分佈式環境,分佈式環境推薦使⽤ aggregategithub

  • distinct: 可使⽤到索引,語法⾮常簡單:db.collection.distinct(field,query),field是去重字段(單個或嵌套字段 名);query是查詢條件sql

2.2 聚合框架 aggregate pipeline

aggregate 聚合框架是基於數據處理管道(pipeline)模型建⽴,⽂檔經過多級管道處理後返回聚合結果;aggregate管道聚合⽅案使⽤的是mongodb內置的彙總操做,相對來講更爲⾼效,在作mongodb數據聚合操做的時候優先推薦aggregate;mongodb

aggregate可以經過索引來提高性能,還有⼀些具體的技巧來管道性能(aggregate 管道操做是在內存中完成的,有內存⼤⼩限制,處理數據集⼤⼩有限);數據庫

aggregate管道操做像unix/Linux系統內的管道操做,將當前⽂檔進⼊第⼀個管道節點處理完成後,將處理完成的數據丟給下⼀個管道節點,⼀直到最後處理完成後,輸出內容;express

aggregate的限制

  1. 當aggregate返回的結果集中的單個⽂檔超過16MB命令會報錯(使⽤aggregate不指定遊標選項或存儲集合中的結果,aggregate命令會返回⼀個包涵於結果集的字段中的bson⽂件。若是結果集的總⼤⼩超過bson⽂件⼤⼩限制(16MB)該命令將產⽣錯誤;)
  2. 管道處理階段有內存限制最⼤不能超過100MB,超過這個限制會報錯誤;爲了可以處理更⼤的數據集能夠開啓allowDiskUse選項,能夠將管道操做寫⼊臨時⽂件;aggregate的使⽤場景適⽤於對聚合響應性能須要⼀定要求的場景(索引及組合優化)

2.3 聚合計算模型 MapReduce

MapReduce的強⼤之處在於可以在多臺Server上並⾏執⾏複雜的聚合邏輯。MapReduce是⼀種計算模型,簡單的說就是將⼤批量的⼯做(數據)分解(MAP)執⾏,而後再將結果合併成最終結果(REDUCE)。MapReduce使⽤慣⽤的javascript操做來作map和reduce操做,所以MapReduce的靈活性和複雜性都會⽐aggregate pipeline更⾼⼀些,而且相對aggregate pipeline⽽⾔更消耗性能;MapReduce⼀般狀況下會使⽤磁盤存儲預處理數 據,⽽pipeline始終在內存處理數據。

MapReduce的使⽤場景 使⽤於處理⼤數據結果集,使⽤javascript靈活度⾼的特色,能夠處理複雜聚合需求

3.aggregate pipeline 實現原理和經常使用語法

代碼使用

List<AggregationOperation> operations = new ArrayList<>();
operations.add(Aggregation.group("name").sum("score").as("totleScore"));
Aggregation aggregation = Aggregation.newAggregation(operations);
mongoTemplate.aggregate(aggregation, getCollectionName(), clazz);
複製代碼

MongoDB中聚合框架(aggregate pipeline)的⽅法使⽤aggregate(),語法以下:

db.COLLECTION_NAME.aggregate(AGGREGATE_OPERATION)

下邊是aggregate()⽅法與mysql聚合類⽐

mongo聚合操做 SQL操做(函數) 說明
$match where 對數據進行條件搜索
$group group by 對數據進行分組聚合
$having having 對聚合後的數據進行過濾篩選
$project select 選擇數據字段
$sort order by 對數據進行排序
$limit limit 限制數據返回數量
$sum sum()、count() 聚合統計數據字段

aggregate中$match、$group等操做被稱爲pipeline中的stage(階段),它們提供了豐富的⽅法來篩選聚合數據,$match提供了$gt(>)、$lt(<)、$in(in)、$nin(not in)、$gte(>=)、$lte(<=)等等篩選符。

$group 按指定的表達式對⽂檔進⾏分組,並將每一個不一樣分組的⽂檔輸出到下⼀個階段。輸出⽂檔包含⼀個_id字段,該字段按鍵包含不一樣的組。輸出⽂檔還能夠包含計算字段,該字段保存由$group的_id字段分組的⼀些accumulator表達式的值。 $group不會輸出具體的⽂檔⽽只是統計信息。語法:

{ $group: { _id: <expression>, <field1>: { <accumulator1> : <expression1> }, ... } }

  • _id字段是必填的;可是,能夠指定_id值爲null來爲整個輸⼊⽂檔計算累計值。
  • 剩餘的計算字段是可選的,並使⽤運算符進⾏計算。

accumulator常⽤操做符:

名稱 描述 sql類比
$avg 計算平均值 avg avg
$first 返回每組第⼀個⽂檔,若是有排序,按照排序,若是沒有按照默認的存儲的順序的第⼀個⽂檔。 limit 0,1
$last 返回每組最後⼀個⽂檔,若是有排序,按照排序,若是沒有按照默認的存儲的順序的最後個⽂檔。 -
$max 根據分組,獲取集合中全部⽂檔對應值得最⼤值。 max
$min 根據分組,獲取集合中全部⽂檔對應值得最⼩值。 min
$sum 計算總和 sum
$push 將指定的表達式的值添加到⼀個數組中。 -

db.collection.aggregate()是基於數據處理的聚合管道,每一個⽂檔經過⼀個由多個階段(stage)組成的管道,能夠對每一個階段的管道進⾏分組、過濾等功能,而後通過⼀系列的處理,輸出相應的結果。經過這張圖,能夠了解Aggregate處理的過程:

聚合管道能夠檢測到是否僅使⽤⽂檔中的⼀部分字段就能夠完成聚合。若是是的話,管道就能夠僅使⽤這些必要的字段,從⽽減小進⼊管道的數據量。

下⾯列舉⼏個常⻅的優化技巧:

  • 1.$match + $group 順序優化 在管道中$group千⾯使⽤$match對⽂檔數據作篩選,能⼤幅度減小單個pipeline返回⽂檔的數量,從⽽提高效率
  • 2.$group + $project 優化 $group 管道對⽂檔數據聚合以後,默認會返回⼀個_id的bson⽂檔,咱們能夠將_id中使⽤到的數據導出,在$project中 只設置限制指定字段,能夠減小輸出⽂檔⼤⼩
  • 3.$skip + $limit 優化 若是你的管道中, $skip 後⾯跟着 $limit ,優化器會把 $limit 移到 $skip 前⾯,這個時候, $limit 的值會加上 $skip 的個數。
  • 4.若是 $sort 在 $limit 前⾯,優化器能夠把 $limit 合併在 $sort 內部。此時若是指定了限定返回 n 個結果,那麼 排序操做僅須要維護最前⾯的 n 個結果,MongoDB只須要在內存中存儲 n 個元素

關於更多的聚合優化技巧,可查看: mongo聚合優化.

4. 代碼實現

4.1 數據整理

本節代碼演示須要用到大量的數據,你們可使用mysql的存儲過程生成海量數據,而後導入到mongo數據庫中,生成方法能夠參考:mysql快速生成百萬數據

{
    "_id" : ObjectId("5e06de309d1f74e9badda0db"),
    "username" : "dvHPRGD1",
    "age" : 87,
    "sex" : 1,
    "salary" : 3251
}
{
    "_id" : ObjectId("5e06de309d1f74e9badda0dc"),
    "username" : "rNx6NsK",
    "age" : 7,
    "sex" : 1,
    "salary" : 7891
}
......
複製代碼

文檔很是簡單,隨機生成了姓名(username)、年齡(age)、性別(sex)、薪資四個內容(salary),其中年齡是0-99的隨機數,性別只有0和1,薪資也是必定範圍的隨機數。

4.2 實現目標和解決思路

有了數據源,咱們的目標也很簡單,就是快速獲得不一樣年齡、不一樣性別的人的總數和薪資平均值。其實就是要求咱們對這千萬數據中年齡和性別作聚合,而後再對薪資作統計計算。

目標明確了以後,咱們給文檔中的年齡和性別添加索引,加快咱們的統計。考慮到咱們要快速得出結果,因此咱們使用mongo的聚合管道aggregate,以前又說過聚合管道有着內存、返回文檔大小的限制,一千萬的數據絕對會超過mongo對內存的限制使用,爲了解決問題,開發人員每每會經過allDiskUse參數打開磁盤來完成數據的聚合工做,可是磁盤和內存的運算效率相差百倍,勢必會影響到聚合效率,沒法保證明時性。

咱們換個思路來解決這個問題,雖然咱們的文檔數據不少,可是年齡是有限的,只有0-99歲100個數,性別也只有男女兩種狀況,咱們可使用go開啓100個goroutine分別聚合age在0-99的文檔數據,聚合完成後再將數據給整合到一塊兒就能夠完成咱們的聚合工做了。go很是適合作這種工做,由於開啓goroutine的代價是不多的,再就是若是數據是分佈式存儲到不一樣的機器上的,又能夠實現數據的分佈式聚合。聚合任務又剛恰好能夠分紅一個個的小任務,爲go語言的並行計算提供了前提條件,一切看起來都是剛恰好。

4.3 代碼解讀

筆者條件有限,就使用本身計算機本地數據庫演示了,首先建立一個mongo鏈接的單例(若是是分佈式搭建的環境,可使用鏈接池)

// mongoAggregate/mongoClient/mongoClient.go
package mongoClient

import (
	"context"
	"fmt"
	"time"

	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

type MongoClient struct {
	Client *mongo.Client
	Collection *mongo.Collection
}

var (
	GMongo *MongoClient
)

func InitMongodb()  {
	var(
		ctx context.Context
		opts *options.ClientOptions
		client *mongo.Client
		err error
		collection *mongo.Collection
	)
	// 鏈接數據庫
	ctx, _ = context.WithTimeout(context.Background(), 10*time.Second)  // ctx
	opts = options.Client().ApplyURI("mongodb://127.0.0.1:27017")  // opts
	if client, err = mongo.Connect(ctx,opts); err != nil{
		fmt.Println(err)
		return
	}

	//連接數據庫和表
	collection = client.Database("screen_data_stat").Collection("test")

	//賦值單例
	GMongo = &MongoClient{
		Client:client,
		Collection:collection,
	}
}
...... //入口文件main.go中初始化(init函數) Mongo鏈接
func init() {
	mongoClient.InitMongodb()
}
複製代碼

聚合函數實如今aggregate包中:

package aggregate

import (
	"context"
	"log"
	"mongoAggregate/mongoClient"
	"sync"
	"time"

	bson2 "go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

func genPipeline(age int) (bson2.D, bson2.D, bson2.D) {
	matchStage := bson2.D{
		{"$match", bson2.D{
			{"age",
				bson2.D{
					{"$eq", age},
				}},
		}},
	}
	groupStage := bson2.D{
		{"$group", bson2.D{
			{"_id", bson2.D{
				{"age", "$age"},
				{"sex", "$sex"},
			}},
			{"age", bson2.D{
				{"$first", "$age"},
			}},
			{"sex", bson2.D{
				{"$first", "$sex"},
			}},
			{"total", bson2.D{
				{"$sum", 1},
			}},
			{"avgSalary", bson2.D{
				{"$avg", "$salary"},
			}},
		}},
	}
	projectStage := bson2.D{
		{"$project", bson2.D{
			{"_id", 0},
			{"age", 1},
			{"sex", 1},
			{"total", 1},
			{"avgSalary", 1},
		}},
	}

	return matchStage, groupStage, projectStage
}

func DataAggregate(age int, resultChan chan bson2.M, wg *sync.WaitGroup) {
	matchStage, groupStage, projectStage := genPipeline(age)
	opts := options.Aggregate().SetMaxTime(15 * time.Second)
	cursor, err := mongoClient.GMongo.Collection.Aggregate(context.TODO(), mongo.Pipeline{matchStage, groupStage, projectStage}, opts)
	if err != nil {
		log.Fatal(err)
	}

	//打印文檔內容
	var results []bson2.M
	if err = cursor.All(context.TODO(), &results); err != nil {
		log.Fatal(err)
	}
	for _, result := range results {
		resultChan <- result
	}
	wg.Done()
}
複製代碼

genPipeline方法用於生成mongo聚合管道的各個階段,由於go語言能夠返回多值,因此在DataAggregate中使用多值接收,將聚合後的結果經過通道resultChan傳出去,完成聚合,sync.WaitGroup是爲了控制主函數先於其餘goroutine先退出而設置的,用於控制併發數量。

由於咱們使用多個goroutine併發運算,咱們獲得結果實際上取決於最慢的那個goroutine完成任務所消耗的時間,咱們對結果進行以下處理:排序、格式化爲json,那麼咱們就須要對輸出內容作以下定義:

//output/resultSlice.go
package output

// 按照 Person.Age 從大到小排序
type OutPut struct {
	Age int32 `json:"age"`
	Sex int32 `json:"sex"`
	Total int32 `json:"total"`
	AvgSalary float64 `json:"avg_salary"`
}

type ResultSlice [] OutPut

func (a ResultSlice) Len() int { // 重寫 Len() 方法
	return len(a)
}
func (a ResultSlice) Swap(i, j int) { // 重寫 Swap() 方法
	a[i], a[j] = a[j], a[i]
}
func (a ResultSlice) Less(i, j int) bool { // 重寫 Less() 方法, 從大到小排序
	return a[j].Age < a[i].Age
}
複製代碼

上邊實現了排序函數接口,咱們就能夠實現輸出結果根據年齡作排序了。

接下來主函數所作的工做就比較清晰了:

func main() {
	dataStatResult := make(chan bson2.M)
	var output output2.ResultSlice
	var wg sync.WaitGroup
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go aggregate.DataAggregate(i, dataStatResult, &wg)
	}

	for value := range dataStatResult {
		output = append(output, output2.OutPut{
			Age:       value["age"].(int32),
			Sex:       value["sex"].(int32),
			Total:     value["total"].(int32),
			AvgSalary: value["avgSalary"].(float64),
		})
		if len(output) == 200 {
			break
		}
	}
	wg.Wait()
	//倒序排列
	sort.Sort(output)
	for _, v := range output {
		result, err := json.Marshal(&v)
		if err != nil {
			fmt.Printf("json.marshal failed, err:", err)
			return
		}
		fmt.Println(string(result))
	}
}
複製代碼

首先定義一個管道,用於主goroutine和其餘併發goroutine通訊,用於接受其餘goroutine計算好的結果,例子中開啓100個goroutine進行分組聚合,聚合後的結果經過dataStatResult通道接收,轉化爲Output結構體,存放到切片中,全部的工做完成以後,對結果按照年齡排序,格式化爲json輸出。這就是併發聚合海量數據的邏輯了。下邊是筆者聚合0-20歲的結果(數據大概有200萬,200ms就完成了聚合工做):

{"age":19,"sex":0,"total":49773,"avg_salary":5346.04197054628}
{"age":19,"sex":1,"total":49985,"avg_salary":4677.7744523357005}
{"age":18,"sex":0,"total":48912,"avg_salary":5335.430671409879}
{"age":18,"sex":1,"total":50136,"avg_salary":4540.624461464816}
{"age":17,"sex":0,"total":49609,"avg_salary":5372.679755689492}
......
複製代碼

5.小結

本文主要講述了在大數據聚合統計場景下的應用,其實不論是不是對實時性有要求的場景都有着分塊聚合思想的存在,mongo的MapReduce聚合、ES的bucketing(桶聚合),都是將大數據聚合分批成小任務,一個個完成,最終完成目標,它們的效率並不高。

6.原文

juejin.im/post/5e0b58…

相關文章
相關標籤/搜索