在MongoDB中,有兩種方式計算聚合:Pipeline 和 MapReduce。Pipeline查詢速度快於MapReduce,可是MapReduce的強大之處在於可以在多臺Server上並行執行復雜的聚合邏輯。MongoDB不容許Pipeline的單個聚合操做佔用過多的系統內存,若是一個聚合操做消耗20%以上的內存,那麼MongoDB直接中止操做,並向客戶端輸出錯誤消息。mongodb
一,使用 Pipeline 方式計算聚合數組
Pipeline 方式使用db.collection.aggregate()函數進行聚合運算,運算速度較快,操做簡單,可是,Pipeline方式有兩個限制:單個聚合操做消耗的內存不能超過20%,聚合操做返回的結果集必須限制在16MB之內。ide
建立示例數據,在集合 foo中插入1000條doc,每一個doc中有三個field:idx,name 和 age。函數
for(i=0;i<10000;i++) { db.foo.insert({"idx":i,name:"user "+i,age:i%90}); }
1,使用$match 管道符過濾collection中doc,使符合條件的doc進入pipeline,可以減小聚合操做消耗的內存,提升聚合的效率。this
db.foo.aggregate({$match:{age:{$lte:25}}})
2,使用$project 管道符,使用doc中的部分field進入下級pipelinespa
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project:{age:1,idx:1,"_id":0}} )
$project 管道符的做用是選擇字段,重命名字段,派生字段。 code
2.1 選擇字段對象
在$project 管道符中,field:1/0,表示選擇/不選擇 field;將無用的字段從pipeline中過濾掉,可以減小聚合操做對內存的消耗。blog
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project:{age:1,idx:1,"_id":0}} )
2.2 對字段重命名,產生新的字段排序
引用符$,格式是:"$field",表示引用doc中 field 的值,若是要引用內嵌 doc中的字段,使用 "$field1.filed2",表示引用內嵌文檔field1中的字段:field2的值。
示例,新建一個field:preIdx,其值和idx 字段的值是相同的。
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project:{age:1,"preIdx":"$idx",idx:1,"_id":0}} )
2.3 派生字段
在$project中,對字段進行計算,根據doc中的字段值和表達式,派生一個新的字段。
示例,preIdx是根據當前doc的idx 減1 獲得的
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project: { age:1, "preIdx":{$subtract:["$idx",1]}, idx:1, "_id":0} } )
在$project 執行算術運算的操做符:+($add),*($multiply),/($divide),%($mod),-($subtract)。
對於字符數據,$substr:[expr,start,length]用於求子字符串;$concat:[expr1,expr2,,,exprn],用於將表達式鏈接在一塊兒;$toLower:expr 和 $toUpper:expr用於返回expr的小寫或大寫形式。
2.4 分組操做
使用$group將doc按照特定的字段的值進行分組,$group將分組字段的值相同的doc做爲一個分組進行聚合計算。若是沒有$group 管道符,那麼全部doc做爲一個分組。對每個分組,都能根據業務邏輯須要計算特定的聚合值。分組操做和排序操做都是非流式的運算符,流式運算符是指:只要有新doc進入,就能夠對doc進行處理,而非流式運算符是指:必須等收到全部的文檔以後,才能對文檔進行處理。分組運算符的處理方式是等接收到全部的doc以後,才能對doc進行分組,而後將各個分組發送給pipeline的下一個運算符進行處理。
示例,按照age進行分組,統計每一個分組中的doc數量
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project:{age:1,"preIdx":{$subtract:["$idx",1]},idx:1,"_id":0}} , {$group:{"_id":"$age",count:{$sum:1}}} )
若是分組字段有多個,按照 age 和 age2 進行分組,這樣作僅僅是爲了演示,在實際的產品環境中,可使用更多的字段用來分組。
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project:{age:1,"preIdx":{$subtract:["$idx",1]},idx:1,"_id":0}} , {$group:{"_id":{age:"$age",age2:"$age"},count:{$sum:1}}} )
對每一個分組進行聚合運算,count字段是計算每一個分組中doc的數量,idxTotal字段是計算每一個分組中idx字段值的加和,idxMax字段是計算每一個分組中idx字段值的最大值,idxFirst是計算每一個分組中第一個idx 字段的值,不必定是最小的。
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project:{age:1,"preIdx":{$subtract:["$idx",1]},idx:1,"_id":0}} , {$group: { "_id":{age:"$age",age2:"$age"}, count:{$sum:1}, idxTotal:{$sum:"$idx"}}, idxMax:{$max:"$idx"}, idxFirst:{$first:"$idx"} }
} )
2.5,sort操做,limit操做 和 skip操做
對聚合操做的結果進行排序,而後跳過前10個doc,取剩餘結果集的前10個doc。
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project:{age:1,"preIdx":{$subtract:["$idx",1]},idx:1,"_id":0}} , {$group: { "_id":{age:"$age",age2:"$age"}, count:{$sum:1}, idxTotal:{$sum:"$idx"}}, idxMax:{$max:"$idx"}, idxFirst:{$first:"$idx"} } }, {$sort:{age:-1}}, {$skip:10}, {$limit:10} )
二,使用MapReduce 方式計算聚合
MapReduce 可以計算很是複雜的聚合邏輯,很是靈活,可是,MapReduce很是慢,不該該用於實時的數據分析中。MapReduce可以在多臺Server上並行執行,每臺Server只負責完成一部分wordload,最後將wordload發送到Master Server上合併,計算出最終的結果集,返回客戶端。
MapReduce分爲兩個階段:Map和Reduce,舉個例子說明,有10節車箱,統計這10節車箱中男生和女生的數量。串行方式一節一節車箱的統計,直到統計徹底部車箱中的人數:男50人,女40人。
使用MapReduce方式的思路是:每一個車箱派一我的去統計,每一個人返回一個doc,例如,keyN:{female:num1,male:num2},keyN是車箱編號,在同一時間,有10我的在同時工做,每一個人只完成所有workload的10%,很快,返回10個doc,從Key1到Key10,只須要將這10個doc中 femal 和 male分別加和到一塊兒,就是所有車箱的人數:男50人,女40人。
使用MapReduce方式計算聚合,主要分爲三步:Map,Shuffle(拼湊)和Reduce,Map和Reduce須要顯式定義,shuffle由MongoDB來實現。
使用MapReduce進行聚合運算的最佳方式是聚合運算的結果可以加到一塊兒,例如,求最大值/最小值,sum,平均值(轉換爲計算每臺Server的 總和sum1,sum2,,,sumN 與 num1,num2,,numN,平均值avg=(sum1+sum2+,,,+sumN)/(num1+num2+,,+numN))等。
示例,使用MapReduce模擬Count,統計集合中的doc的數量
step1,定義Map函數和reduce函數
對於每一個doc,直接返回key 和 一個doc:{count:1}
map=function (){ for(var key in this) { emit(key,{count:1}); } } reduce=function (key,emits){ total=0; for(var i in emits){ total+=emits[i].count; } return {"count":total}; }
step2,執行MapReduce運算
在集合 foo上執行MapReduce運算,返回mr 對象
mr=db.runCommand( { "mapreduce":"foo", "map":map, "reduce":reduce, out:"Count Doc" })
step3,查看MapReduce計算的結果
db[mr.result].find()
示例2,統計集合foo中不一樣age的數量
step1,定義Map 和 Reduce函數
Map函數的做用是對每一個doc進行一次映射,返回age 和 {count:1};
通過Shuffle,每一個age都有一個列表:[{count:1},{count:1},{count:1},{count:1},,,,,],有多少個不一樣的age,MongoDB都會調用多少次Reduce函數,每次調用時,Key值是不一樣的。
Reduce函數的做用:對MongoDB的一次調用,對age對應的列表進行聚合運算。
map=function () { emit(this.age,{count:1}); } reduce= function (key,emits) { total=0; for(var i in emits) { total+=emits[i].count; } return {"age":key,count:total}; }
step2,執行MapReduce聚合運算
mr=db.runCommand( { "mapreduce":"foo", "map":map, "reduce":reduce, out:"Count Doc" })
step3,查看聚合運算的結果
db[mr.result].find()
示例3,研究reduce函數的特性
reduce函數具備累加的特性,經過屢次調用,可以產生最終的累加值,例如,如下reduce函數對於任意一個特定的key,reduce都能計算key的數量
reduce= function (key,emits) { total=0; for(var i in emits) { total+=emits[i].count; } return {"key":key,count:total}; }
調用示例:傳遞的Key是相同的,都是「x」,每一個emits都是一個數組,反覆調用reduce函數,最終得到key的累加值。
r1=reduce("x",[{count:1},{count:2}]) r2=reduce("x",[{count:3},{count:5}]) r3=reduce("x",[r1,r2])
參考doc: