MongoDB 聚合管道(Aggregation Pipeline)

管道概念

POSIX多線程的使用方式中, 有一種很重要的方式-----流水線(亦稱爲「管道」)方式,「數據元素」流串行地被一組線程按順序執行。它的使用架構可參考下圖:mongodb

clip_image002

以面向對象的思想去理解,整個流水線,能夠理解爲一個數據傳輸的管道;該管道中的每個工做線程,能夠理解爲一個整個流水線的一個工做階段stage,這些工做線程之間的合做是一環扣一環的。靠輸入口越近的工做線程,是時序較早的工做階段stage,它的工做成果會影響下一個工做線程階段(stage)的工做結果,即下個階段依賴於上一個階段的輸出,上一個階段的輸出成爲本階段的輸入。這也是pipeline的一個共有特色!shell

爲了迴應用戶對簡單數據訪問的需求,MongoDB2.2版本引入新的功能聚合框架(Aggregation Framework) ,它是數據聚合的一個新框架,其概念相似於數據處理的管道。 每一個文檔經過一個由多個節點組成的管道,每一個節點有本身特殊的功能(分組、過濾等),文檔通過管道處理後,最後輸出相應的結果。管道基本的功能有兩個:數據庫

一是對文檔進行「過濾」,也就是篩選出符合條件的文檔;express

二是對文檔進行「變換」,也就是改變文檔的輸出形式。json

其餘的一些功能還包括按照某個指定的字段分組和排序等。並且在每一個階段還可使用表達式操做符計算平均值和拼接字符串等相關操做。管道提供了一個MapReduce 的替代方案,MapReduce使用相對來講比較複雜,而管道的擁有固定的接口(操做符表達),使用比較簡單,對於大多數的聚合任務管道通常來講是首選方法。數組

該框架使用聲明性管道符號來支持相似於SQL Group By操做的功能,而再也不須要用戶編寫自定義的JavaScript例程。多線程

大部分管道操做會在「aggregate」子句後會跟上「$match」打頭。它們用在一塊兒,就相似於SQL的from和where子句,或是MongoDB的find函數。「$project」子句看起來也很是相似SQL或MongoDB中的某個概念(和SQL不一樣的是,它位於表達式尾端)。架構

接下來介紹的操做在MongoDB聚合框架中是獨一無二的。與大多數關係數據庫不一樣,MongoDB天生就能夠在行/文檔內存儲數組。儘管該特性對於全有全無的數據訪問十分便利,可是它對於須要組合投影、分組和過濾操做來編寫報告的工做,卻顯得至關複雜。「$unwind」子句將數組分解爲單個的元素,並與文檔的其他部分一同返回。框架

「$group」操做與SQL的Group By子句用途相同,可是使用起來卻更像是LINQ中的分組運算符。與取回一行平面數據不一樣,「$group」操做的結果集會呈現爲一個持續的嵌套結構。正因如此,使用「$group」能夠返回聚合信息,例如對於每一個分組中的實際文檔,計算文檔總體或部分的數目和平均值。ide

管道操做符

管道是由一個個功能節點組成的,這些節點用管道操做符來進行表示。聚合管道以一個集合中的全部文檔做爲開始,而後這些文檔從一個操做節點 流向下一個節點 ,每一個操做節點對文檔作相應的操做。這些操做可能會建立新的文檔或者過濾掉一些不符合條件的文檔,在管道中能夠對文檔進行重複操做。

先看一個管道聚合的例子:

clip_image004

管道操做符的種類:

Name

Description

$project

Reshapes a document stream. $project can rename, add, or remove fields as well as create computed values and sub-documents.

$match

Filters the document stream, and only allows matching documents to pass into the next pipeline stage.$match uses standard MongoDB queries.

$limit

Restricts the number of documents in an aggregation pipeline.

$skip

Skips over a specified number of documents from the pipeline and returns the rest.

$unwind

Takes an array of documents and returns them as a stream of documents.

$group

Groups documents together for the purpose of calculating aggregate values based on a collection of documents.

$sort

Takes all input documents and returns them in a stream of sorted documents.

$geoNear

Returns an ordered stream of documents based on proximity to a geospatial point.

管道操做符詳細使用說明

  1.  $project: 數據投影,主要用於重命名、增長和刪除字段

例如:

db.article.aggregate(

{ $project : {

title : 1 ,

author : 1 ,

}}

);

這樣的話結果中就只還有_id,tilte和author三個字段了,默認狀況下_id字段是被包含的,若是要想不包含_id話能夠這樣:

db.article.aggregate(

{ $project : {

_id : 0 ,

title : 1 ,

author : 1

}});

也能夠在$project內使用算術類型表達式操做符,例如:

db.article.aggregate(

{ $project : {

title : 1,

doctoredPageViews : { $add:["$pageViews", 10] }

}});

經過使用$add給pageViews字段的值加10,而後將結果賦值給一個新的字段:doctoredPageViews

注:必須將$add計算表達式放到中括號裏面

除此以外使用$project還能夠重命名字段名和子文檔的字段名:

db.article.aggregate(

{ $project : {

title : 1 ,

page_views : "$pageViews" ,

bar : "$other.foo"

}});

也能夠添加子文檔:

db.article.aggregate(

{ $project : {

title : 1 ,

stats : {

pv : "$pageViews",

foo : "$other.foo",

dpv : { $add:["$pageViews", 10] }

}

}});

產生了一個子文檔stats,裏面包含pv,foo,dpv三個字段。

2.$match: 濾波操做,篩選符合條件文檔,做爲下一階段的輸入

   $match的語法和查詢表達式(db.collection.find())的語法相同

db.articles.aggregate( [

{ $match : { score : { $gt : 70, $lte : 90 } } },

{ $group: { _id: null, count: { $sum: 1 } } }

] );

   $match用於獲取分數大於70小於或等於90記錄,而後將符合條件的記錄送到下一階段$group管道操做符進行處理。

注意:1.不能在$match操做符中使用$where表達式操做符。

          2.$match儘可能出如今管道的前面,這樣能夠提前過濾文檔,加快聚合速度。

          3.若是$match出如今最前面的話,可使用索引來加快查詢。

3.  $limit:  限制通過管道的文檔數量

     $limit的參數只能是一個正整數

db.article.aggregate(

{ $limit : 5 });

這樣的話通過$limit管道操做符處理後,管道內就只剩下前5個文檔了

4. $skip: 從待操做集合開始的位置跳過文檔的數目

    $skip參數也只能爲一個正整數

db.article.aggregate(

{ $skip : 5 });

通過$skip管道操做符處理後,前五個文檔被「過濾」掉

5.$unwind:將數組元素拆分爲獨立字段

例如:article文檔中有一個名字爲tags數組字段:

> db.article.find()
  { "_id" : ObjectId("528751b0e7f3eea3d1412ce2"),

"author" : "Jone", "title" : "Abook",

"tags" : [  "good",  "fun",  "good" ] }

使用$unwind操做符後:

> db.article.aggregate({$project:{author:1,title:1,tags:1}},{$unwind:"$tags"})
{
        "result" : [
                {
                        "_id" : ObjectId("528751b0e7f3eea3d1412ce2"),
                        "author" : "Jone",
                        "title" : "A book",
"tags" : "good"
                },
                {
                        "_id" : ObjectId("528751b0e7f3eea3d1412ce2"),
                        "author" : "Jone",
                        "title" : "A book",
"tags" : "fun"
                },
                {
                        "_id" : ObjectId("528751b0e7f3eea3d1412ce2"),
                        "author" : "Jone",
                        "title" : "A book",
  "tags" : "good"
                }
        ],
        "ok" : 1
}

注意:a.{$unwind:"$tags"})不要忘了$符號

          b.若是$unwind目標字段不存在的話,那麼該文檔將被忽略過濾掉,例如:

     > db.article.aggregate({$project:{author:1,title:1,tags:1}},{$unwind:"$tag"})
    { "result" : [ ], "ok" : 1 }
將$tags改成$tag因不存在該字段,該文檔被忽略,輸出的結果爲空

        c.若是$unwind目標字段不是一個數組的話,將會產生錯誤,例如:

  > db.article.aggregate({$project:{author:1,title:1,tags:1}},{$unwind:"$title"})

    Error: Printing Stack Trace
    at printStackTrace (src/mongo/shell/utils.js:37:15)
    at DBCollection.aggregate (src/mongo/shell/collection.js:897:9)
    at (shell):1:12
    Sat Nov 16 19:16:54.488 JavaScript execution failed: aggregate failed: {
        "errmsg" : "exception: $unwind:  value at end of field path must be an array",
        "code" : 15978,
        "ok" : 0
} at src/mongo/shell/collection.js:L898

      d.若是$unwind目標字段數組爲空的話,該文檔也將會被忽略。

  6.$group 對數據進行分組

    $group的時候必需要指定一個_id域,同時也能夠包含一些算術類型的表達式操做符:

db.article.aggregate(

{ $group : {

_id : "$author",

docsPerAuthor : { $sum : 1 },

viewsPerAuthor : { $sum : "$pageViews" }

}});

注意:  1.$group的輸出是無序的。

          2.$group操做目前是在內存中進行的,因此不能用它來對大量個數的文檔進行分組。

7.$sort : 對文檔按照指定字段排序

使用方式以下:

db.users.aggregate( { $sort : { age : -1, posts: 1 } });

按照年齡進行降序操做,按照posts進行升序操做

注意:1.若是將$sort放到管道前面的話能夠利用索引,提升效率

        2.MongoDB 24.對內存作了優化,在管道中若是$sort出如今$limit以前的話,$sort只會對前$limit個文檔進行操做,這樣在內存中也只會保留前$limit個文檔,從而能夠極大的節省內存

        3.$sort操做是在內存中進行的,若是其佔有的內存超過物理內存的10%,程序會產生錯誤

8.$goNear

        $goNear會返回一些座標值,這些值以按照距離指定點距離由近到遠進行排序

具體使用參數見下表:

Field

Type

Description

near

GeoJSON point orlegacy coordinate pairs

The point for which to find the closest documents.

distanceField

string

The output field that contains the calculated distance. To specify a field within a subdocument, use dot notation.

limit

number

Optional. The maximum number of documents to return. The default value is 100. See also the num option.

num

number

Optional. The num option provides the same function as the limitoption. Both define the maximum number of documents to return. If both options are included, the num value overrides the limit value.

maxDistance

number

Optional. A distance from the center point. Specify the distance in radians. MongoDB limits the results to those documents that fall within the specified distance from the center point.

query

document

Optional. Limits the results to the documents that match the query. The query syntax is the usual MongoDB read operation query syntax.

spherical

Boolean

Optional. If true, MongoDB references points using a spherical surface. The default value is false.

distanceMultiplier

number

Optional. The factor to multiply all distances returned by the query. For example, use the distanceMultiplier to convert radians, as returned by a spherical query, to kilometers by multiplying by the radius of the Earth.

includeLocs

string

Optional. This specifies the output field that identifies the location used to calculate the distance. This option is useful when a location field contains multiple locations. To specify a field within a subdocument, usedot notation.

uniqueDocs

Boolean

Optional. If this value is true, the query returns a matching document once, even if more than one of the document’s location fields match the query. If this value is false, the query returns a document multiple times if the document has multiple matching location fields. See $uniqueDocsfor more information.

例如:

db.places.aggregate([

{

$geoNear: {

near: [40.724, -73.997],

distanceField: "dist.calculated",

maxDistance: 0.008,

query: { type: "public" },

includeLocs: "dist.location",

uniqueDocs: true,

num: 5

}

}

])

其結果爲:

{

"result" : [

{ "_id" : 7,

"name" : "Washington Square",

"type" : "public",

"location" : [

[ 40.731, -73.999 ],

[ 40.732, -73.998 ],

[ 40.730, -73.995 ],

[ 40.729, -73.996 ]

],

"dist" : {

"calculated" : 0.0050990195135962296,

"location" : [ 40.729, -73.996 ]

}

},

{ "_id" : 8,

"name" : "Sara D. Roosevelt Park",

"type" : "public",

"location" : [

[ 40.723, -73.991 ],

[ 40.723, -73.990 ],

[ 40.715, -73.994 ],

[ 40.715, -73.994 ]

],

"dist" : {

"calculated" : 0.006082762530298062,

"location" : [ 40.723, -73.991 ]

}

}

],

"ok" : 1}

其中,dist.calculated中包含了計算的結果,而dist.location中包含了計算距離時實際用到的座標

注意: 1.使用$goNear只能在管道處理的開始第一個階段進行

         2.必須指定distanceField,該字段用來決定是否包含距離字段

3.$gonNear和geoNear命令比較類似,可是也有一些不一樣:distanceField在$geoNear中是必選的,而在geoNear中是可選的;includeLocs在$geoNear中是string類型,而在geoNear中是boolen類型。

管道表達式

管道操做符做爲「鍵」,所對應的「值」叫作管道表達式。例如上面例子中{$match:{status:"A"}},$match稱爲管道操做符,而{status:"A"}稱爲管道表達式,它能夠看做是管道操做符的操做數(Operand),每一個管道表達式是一個文檔結構,它是由字段名、字段值、和一些表達式操做符組成的,例如上面例子中管道表達式就包含了一個表達式操做符$sum進行累加求和。

每一個管道表達式只能做用於處理當前正在處理的文檔,而不能進行跨文檔的操做。管道表達式對文檔的處理都是在內存中進行的。除了可以進行累加計算的管道表達式外,其餘的表達式都是無狀態的,也就是不會保留上下文的信息。累加性質的表達式操做符一般和$group操做符一塊兒使用,來統計該組內最大值、最小值等,例如上面的例子中咱們在$group管道操做符中使用了具備累加的$sum來計算總和。

除了$sum覺得,還有如下性質的表達式操做符:

組聚合操做符

Name

Description

$addToSet

Returns an array of all the unique values for the selected field among for each document in that group.

$first

Returns the first value in a group.

$last

Returns the last value in a group.

$max

Returns the highest value in a group.

$min

Returns the lowest value in a group.

$avg

Returns an average of all the values in a group.

$push

Returns an array of all values for the selected field among for each document in that group.

$sum

Returns the sum of all the values in a group.

Bool類型聚合操做符

Name

Description

$and

Returns true only when all values in its input array are true.

$or

Returns true when any value in its input array are true.

$not

Returns the boolean value that is the opposite of the input value.

比較類型聚合操做符

Name

Description

$cmp

Compares two values and returns the result of the comparison as an integer.

$eq

Takes two values and returns true if the values are equivalent.

$gt

Takes two values and returns true if the first is larger than the second.

$gte

Takes two values and returns true if the first is larger than or equal to the second.

$lt

Takes two values and returns true if the second value is larger than the first.

$lte

Takes two values and returns true if the second value is larger than or equal to the first.

$ne

Takes two values and returns true if the values are not equivalent.

算術類型聚合操做符

Name

Description

$add

Computes the sum of an array of numbers.

$divide

Takes two numbers and divides the first number by the second.

$mod

Takes two numbers and calcualtes the modulo of the first number divided by the second.

$multiply

Computes the product of an array of numbers.

$subtract

Takes two numbers and subtracts the second number from the first.

字符串類型聚合操做符

Name

Description

$concat

Concatenates two strings.

$strcasecmp

Compares two strings and returns an integer that reflects the comparison.

$substr

Takes a string and returns portion of that string.

$toLower

Converts a string to lowercase.

$toUpper

Converts a string to uppercase.

日期類型聚合操做符

Name

Description

$dayOfYear

Converts a date to a number between 1 and 366.

$dayOfMonth

Converts a date to a number between 1 and 31.

$dayOfWeek

Converts a date to a number between 1 and 7.

$year

Converts a date to the full year.

$month

Converts a date into a number between 1 and 12.

$week

Converts a date into a number between 0 and 53

$hour

Converts a date into a number between 0 and 23.

$minute

Converts a date into a number between 0 and 59.

$second

Converts a date into a number between 0 and 59. May be 60 to account for leap seconds.

$millisecond

Returns the millisecond portion of a date as an integer between 0 and 999.

條件類型聚合操做符

Name

Description

$cond

A ternary operator that evaluates one expression, and depending on the result returns the value of one following expressions.

$ifNull

Evaluates an expression and returns a value.

注:以上操做符都必須在管道操做符的表達式內來使用。

各個表達式操做符的具體使用方式參見:

http://docs.mongodb.org/manual/reference/operator/aggregation-group/

聚合管道的優化

   1.$sort  +  $skip  +  $limit順序優化

若是在執行管道聚合時,若是$sort、$skip、$limit依次出現的話,例如:

{ $sort: { age : -1 } },

{ $skip: 10 },

{ $limit: 5 }

那麼實際執行的順序爲:

{ $sort: { age : -1 } },

{ $limit: 15 },

{ $skip: 10 }

$limit會提早到$skip前面去執行。

此時$limit = 優化前$skip+優化前$limit

這樣作的好處有兩個:1.在通過$limit管道後,管道內的文檔數量個數會「提早」減少,這樣會節省內存,提升內存利用效率。2.$limit提早後,$sort緊鄰$limit這樣的話,當進行$sort的時候當獲得前「$limit」個文檔的時候就會中止。

2.$limit + $skip + $limit + $skip Sequence Optimization

若是聚合管道內反覆出現下面的聚合序列:

  { $limit: 100 },

  { $skip: 5 },

  { $limit: 10},

  { $skip: 2 }

首先進行局部優化爲:能夠按照上面所講的先將第二個$limit提早:

{ $limit: 100 },

  { $limit: 15},

  { $skip: 5 },

  { $skip: 2 }

進一步優化:兩個$limit能夠直接取最小值 ,兩個$skip能夠直接相加:

{ $limit: 15 },

  { $skip: 7 }

3.Projection Optimization

過早的使用$project投影,設置須要使用的字段,去掉不用的字段,能夠大大減小內存。除此以外也能夠過早使用

咱們也應該過早使用$match、$limit、$skip操做符,他們能夠提早減小管道內文檔數量,減小內存佔用,提供聚合效率。

除此以外,$match儘可能放到聚合的第一個階段,若是這樣的話$match至關於一個按條件查詢的語句,這樣的話可使用索引,加快查詢效率。

聚合管道的限制

    1.類型限制

在管道內不能操做 Symbol, MinKey, MaxKey, DBRef, Code, CodeWScope類型的數據( 2.4版本解除了對二進制數據的限制).

     2.結果大小限制

管道線的輸出結果不能超過BSON 文檔的大小(16M),若是超出的話會產生錯誤.

     3.內存限制

若是一個管道操做符在執行的過程當中所佔有的內存超過系統內存容量的10%的時候,會產生一個錯誤。

當$sort和$group操做符執行的時候,整個輸入都會被加載到內存中,若是這些佔有內存超過系統內存的%5的時候,會將一個warning記錄到日誌文件。一樣,所佔有的內存超過系統內存容量的10%的時候,會產生一個錯誤。

分片上使用聚合管道

聚合管道支持在已分片的集合上進行聚合操做。當分片集合上進行聚合操縱的時候,聚合管道被分爲兩成兩個部分,分別在mongod實例和mongos上進行操做。

聚合管道使用

首先下載測試數據:http://media.mongodb.org/zips.json 並導入到數據庫中。

1.查詢各州的人口數

var connectionString = ConfigurationManager.AppSettings["MongodbConnection"];

var client = new MongoClient(connectionString);

var DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];

string collName = ConfigurationManager.AppSettings["collName"];

MongoServer mongoDBConn = client.GetServer();

MongoDatabase db = mongoDBConn.GetDatabase(DatabaseName);

MongoCollection<BsonDocument> table = db[collName];

var group = new BsonDocument

{

{"$group", new BsonDocument

{

{

"_id","$state"

},

{

"totalPop", new BsonDocument

{

{ "$sum","$pop" }

}

}

}

}

};

var sort = new BsonDocument

{

{"$sort", new BsonDocument{ { "_id",1 }}}

};

var pipeline = new[] { group, sort };

var result = table.Aggregate(pipeline);

var matchingExamples = result.ResultDocuments.Select(x => x.ToDynamic()).ToList();

foreach (var example in matchingExamples)

{

var message = string.Format("{0}- {1}", example["_id"], example["totalPop"]);

Console.WriteLine(message);

}

2.計算每一個州平均每一個城市打人口數

> db.zipcode.aggregate({$group:{_id:{state:"$state",city:"$city"},pop:{$sum:"$pop"}}},

                              {$group:{_id:"$_id.state",avCityPop:{$avg:"$pop"}}},

                                       {$sort:{_id:1}})

var group1 = new BsonDocument

{

{"$group", new BsonDocument

{

{

"_id",new BsonDocument

{

{"state","$state"},

{"city","$city"}

}

},

{

"pop", new BsonDocument

{

{ "$sum","$pop" }

}

}

}

}

};

var group2 = new BsonDocument

{

{"$group", new BsonDocument

{

{

"_id","$_id.state"

},

{

"avCityPop", new BsonDocument

{

{ "$avg","$pop" }

}

}

}

}

};

var pipeline1 = new[] { group1,group2, sort };

var result1 = table.Aggregate(pipeline1);

var matchingExamples1 = result1.ResultDocuments.Select(x => x.ToDynamic()).ToList();

foreach (var example in matchingExamples1)

{

var message = string.Format("{0}- {1}", example["_id"], example["avCityPop"]);

Console.WriteLine(message);

}

3.計算每一個州人口最多和最少的城市名字

>db.zipcode.aggregate({$group:{_id:{state:"$state",city:"$city"},pop:{$sum:"$pop"}}},

                                      {$sort:{pop:1}},

                                      {$group:{_id:"$_id.state",biggestCity:{$last:"$_id.city"},biggestPop:{$last:"$pop"},smallestCity:{$first:"$_id.city"},smallestPop:{$first:"$pop"}}},

                                      {$project:{_id:0,state:"$_id",biggestCity:{name:"$biggestCity",pop:"$biggestPop"},smallestCity:{name:"$smallestCity",pop:"$smallestPop"}}})

var sort1 = new BsonDocument

{

{"$sort", new BsonDocument{ { "pop",1 }}}

};

var group3 = new BsonDocument

{

{

"$group", new BsonDocument

{

{

"_id","$_id.state"

},

{

"biggestCity",new BsonDocument

{

{"$last","$_id.city"}

}

},

{

"biggestPop",new BsonDocument

{

{"$last","$pop"}

}

},

{

"smallestCity",new BsonDocument

{

{"$first","$_id.city"}

}

},

{

"smallestPop",new BsonDocument

{

{"$first","$pop"}

}

}

}

}

};

var project = new BsonDocument

{

{

"$project", new BsonDocument

{

{"_id",0},

{"state","$_id"},

{"biggestCity",new BsonDocument

{

{"name","$biggestCity"},

{"pop","$biggestPop"}

}},

{"smallestCity",new BsonDocument

{

{"name","$smallestCity"},

{"pop","$smallestPop"}

}

}

}

}

};

var pipeline2 = new[] { group1,sort1 ,group3, project };

var result2 = table.Aggregate(pipeline2);

var matchingExamples2 = result2.ResultDocuments.Select(x => x.ToDynamic()).ToList();

foreach (var example in matchingExamples2)

{

Console.WriteLine(example.ToString());

//var message = string.Format("{0}- {1}", example["_id"], example["avCityPop"]);

//Console.WriteLine(message);

}

總結

對於大多數的聚合操做,聚合管道能夠提供很好的性能和一致的接口,使用起來比較簡單, 和MapReduce同樣,它也能夠做用於分片集合,可是輸出的結果只能保留在一個文檔中,要遵照BSON Document大小限制(當前是16M)。

管道對數據的類型和結果的大小會有一些限制,對於一些簡單的固定的彙集操做可使用管道,可是對於一些複雜的、大量數據集的聚合任務仍是使用MapReduce。

相關文章:

http://mikaelkoskinen.net/mongodb-aggregation-framework-examples-in-c/

相關文章
相關標籤/搜索