MongoSpark爲入口類,調用MongoSpark.load,該方法返回一個MongoRDD類對象,Mongo Spark Connector框架本質上就是一個大號的自定義RDD,加了些自定義配置、適配幾種分區器規則、Sql的數據封裝等等,我的認爲相對核心的也就是分區器的規則實現;弄清楚了其分析器也就搞明白了Mongo Spark Connector 。數組
MongoPaginateByCountPartitioner 基於總數的分頁分區器
MongoPaginateBySizePartitioner 基於大小的分頁分區器
MongoSamplePartitioner 基於採樣的分區器
MongoShardedPartitioner 基於分片的分區器
MongoSinglePartitioner 單分區分區器
MongoSplitVectorPartitioner 基於分割向量的分區器框架
這裏根據源碼簡單介紹MongoSinglePartitioner與MongoSamplePartitioner分區器,這或許就是用得最多的兩種分區器,他的默認分區器(DefaultMongoPartitioner)就是MongoSamplePartitioner分區器;
該分區默認的PartitionKey爲_id、默認PartitionSizeMB爲64MB、默認每一個分區採樣爲10;ui
該類的核心也是惟一的方法爲:partitions方法,下面爲該方法的執行流程與核心邏輯;
一、檢查執行buildInfo指令檢查Mongo版本用於判斷是否支持隨機採樣聚合運算,版本大於3.2。 hasSampleAggregateOperator方法。Mongo3.2版本中才新增了數據採樣功能。
Mongodb中的語法爲:3d
db.cName.aggregate([ {$sample:{ size: 10 } } ])
上示例N等於10,若是N大於collection中總數據的5%,那麼$sample將會執行collection掃描、sort,而後選擇top N條文檔;若是N小於5%,對於wiredTiger而言則會遍歷collection並使用「僞隨機」的方式選取N條文檔,對於MMAPv1引擎則在_id索引上隨機選取N條文檔。
二、執行collStats,用於獲取集合的存儲信息,如行數、大小、存儲大小等等信息;
matchQuery: 過濾條件
partitionerOptions: ReadConfig傳進去的分析器選項
partitionKey: 分區key,默認爲_id
partitionSizeInBytes: 分區大小,默認64MB
samplesPerPartition: 每一個分區默認採樣數量,默認10
count: 集合總條數
avgObjSizeInBytes: 對象平均字節數
numDocumentsPerPartition: 每一個分區文檔數, partitionSizeInBytes / avgObjSizeInBytes:分區大小/對象平均大小
numberOfSamples: 採樣數量,samplesPerPartition * count / numDocumentsPerPartition,每一個分區採樣數*集合總數/每一個分區文檔數code
如每一個分區文檔數大於集合總文檔數,則將直接建立單分區,不採起採樣數據方式建立分區,由於此時數據量太少單個分區已經能夠容得下無需多個分區;對象
在MongoSinglePartitioner類中經過PartitionerHelper.createPartitions執行相關邏輯;
_id做爲partitionKey,blog
指定採樣條件、採樣數據量、PartitionKey、排序條件等,獲取採樣數據;
集合拆分:排序
def collectSplit(i: Int): Boolean = (i % samplesPerPartition == 0) || !matchQuery.isEmpty && i == count – 1
右側邊界:索引
val rightHandBoundaries = samples.zipWithIndex.collect { case (field, i) if collectSplit(i) => field.get(partitionKey) }
獲取右側邊界,使用採樣數據數組索引對每一個分區採樣數求餘等於0對採樣數據進行過濾取右側邊界(如匹配條件不爲空則再取最後一條數據);
如採樣獲得62條數據,而且沒有存在匹配條件,根據上述的採樣數據過濾條件最後取得7條數據,分別爲數據數組索引爲0、索引爲十、20、30、40、50、60的7條數據,數據的值爲PartitionKey默認就是集合中_id字段的值;ip
建立分區(Partitions)
獲取獲得PartitionKey、rightHandBoundaries後就能夠調用PartitionerHelper.createPartitions建立Partition;下面爲建立Partition的具體邏輯;
使用PartitionKey建立查詢邊界,每一個分區具備不一樣的查詢邊界,有最大、最小邊界; 此處建立分區Partition並在每一個分區中指定了查詢邊界;
上面獲取獲得了7條數據,此處將建立8個分區;下面給出了簡單數據用於說明該分區邊界條件的基本邏輯與實現;
一、建立Min、一、三、五、七、九、十一、1三、Max的序列
二、建立一、三、五、七、九、十一、1三、Max序列
三、使用zip將兩個序列拉鍊式的合併:合併後的數據爲:
四、Min,一、1,三、3,五、5,七、7,九、9,十一、11,1三、13,Max
Partition的邊界條件將使用上面的邊界條件,8條數據八個Partition一個對應;
0 Partition的邊界條件爲:小於1
1 Partition的邊界條件爲:大於等於1 小於 3
2 Partition的邊界條件爲:大於等於3 小於 5
3 Partition的邊界條件爲:大於等於5 小於 7
4 Partition的邊界條件爲:大於等於7小於 9
5 Partition的邊界條件爲:大於等於9 小於 11
6 Partition的邊界條件爲:大於等於11 小於 13
7 Partition的邊界條件爲:大於等於13
上面的8個Partition爲8個MongoPartition對象,每一個對象的index、查詢邊界與上面所說的一一對應;
在MongoRDD類的compute方法中能夠看到根據對應的分區與上面建立分區時所創建的邊界條件用於計算(從Mongo中獲取對應數據);
建立單分區分區器時,直接調用PartitionerHelper.createPartitions方法建立分區,該類並沒有其餘邏輯,而且固定的PartitionKey爲_id,右側邊界條件爲空集合,而後建立id爲0的MongoPartition對象,並沒有查詢邊界;