Mongdb中經常使用的數據清洗

前言

分享一些Mongdb經常使用的數據清洗方式python

注:"Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in." 緣由與解決:

mongdb_bigdata_error.png

  • 緣由
大數據計算、數據統計時,每一個計算任務(job或task)都會使用獨立有限的內存空間,mongodb沒有提供複雜的內存分配模型(任務調度算法),僅限定每一個stage最多使用100M內存,若是超過此值將終止計算並返回error;爲了支持較大數據集合的處理,咱們能夠指定「allowDiskUse」參數將「溢出」的數據寫入本地的臨時文件中(臨時的collection),這個參數咱們一般須要設定爲true。
  • 解決方案:
{allowDiskUse:true}

查詢重複數據

db.feedImg_all.aggregate([
    {
        $group: { _id: {'mvid': '$mvid','feed_id':'$feed_id'},count: {$sum: 1},dups: {$addToSet: '$_id'}}
    },
    {
        $match: {count: {$gt: 1}}
    }
],{allowDiskUse:true})

刪除重複數據

db.xiuxiu_all.aggregate([
    {
        $group: { _id: {'mvid': '$mvid','feed_id':'$feed_id'},count: {$sum: 1},dups: {$addToSet: '$_id'}}
    },
    {
        $match: {count: {$gt: 1}}
    }
],{allowDiskUse:true}).forEach(function(doc){
    doc.dups.shift();                       // 去除重複組的第一個元數據_id,獲得除第一個以外的其餘元組
    db.xiuxiu_all.remove({_id: {$in: doc.dups}}); // remove()刪除這些重複的數據
})

刪除重複數據(python版本)

# -*- coding:utf-8 -*-
import pymongo
from pymongo import DeleteOne
'''
@author: lcx
@time: 2018/11/15
@desc:

'''

pipeline = [
    {
        '$group': {
            '_id': {'mvid': '$mvid', 'feed_id': '$feed_id'},
            'count': {'$sum': 1},
            'dups': {
                '$addToSet': '$_id'
            }
        },
    },
    {
        '$match': {
            'count': {
                '$gt': 1
            }
        }
    }
]

myclient = pymongo.MongoClient(host='m3005.test.com',port=3005,connect=False)
db = myclient.deepnet_test

if __name__ == '__main__':
    map_id = map(lambda doc: doc['dups'][1:], db['xiuxiu_all'].aggregate(pipeline=pipeline,allowDiskUse=True))
    list_id = [item for sublist in map_id for item in sublist]
    print(db['xiuxiu_all']
          .bulk_write(list(map(lambda _id: DeleteOne({'_id': _id}), list_id)))
          .bulk_api_result)

複製collection裏的數據到另外一個collection中

db.xiuxiu_all.find().forEach(function(x){
    db.xiuxiu_all_bak.insert(x);
})

過濾重複字段並統計總記錄數

db.feedImg_all.aggregate(
  [
     {$match:{"createTime": {"$gte":  1541606400, "$lt": 1541692800}}}, // 添加過濾條件
     {$project:{"feedId": true}},  
     {$group:{_id: "$feedId"}}, 
     {$group:{_id: null, count: {$sum:1}}}
 ], {allowDiskUse: true})
相關文章
相關標籤/搜索