分享一些Mongdb經常使用的數據清洗方式python
大數據計算、數據統計時,每一個計算任務(job或task)都會使用獨立有限的內存空間,mongodb沒有提供複雜的內存分配模型(任務調度算法),僅限定每一個stage最多使用100M內存,若是超過此值將終止計算並返回error;爲了支持較大數據集合的處理,咱們能夠指定「allowDiskUse」參數將「溢出」的數據寫入本地的臨時文件中(臨時的collection),這個參數咱們一般須要設定爲true。
{allowDiskUse:true}
db.feedImg_all.aggregate([ { $group: { _id: {'mvid': '$mvid','feed_id':'$feed_id'},count: {$sum: 1},dups: {$addToSet: '$_id'}} }, { $match: {count: {$gt: 1}} } ],{allowDiskUse:true})
db.xiuxiu_all.aggregate([ { $group: { _id: {'mvid': '$mvid','feed_id':'$feed_id'},count: {$sum: 1},dups: {$addToSet: '$_id'}} }, { $match: {count: {$gt: 1}} } ],{allowDiskUse:true}).forEach(function(doc){ doc.dups.shift(); // 去除重複組的第一個元數據_id,獲得除第一個以外的其餘元組 db.xiuxiu_all.remove({_id: {$in: doc.dups}}); // remove()刪除這些重複的數據 })
# -*- coding:utf-8 -*- import pymongo from pymongo import DeleteOne ''' @author: lcx @time: 2018/11/15 @desc: ''' pipeline = [ { '$group': { '_id': {'mvid': '$mvid', 'feed_id': '$feed_id'}, 'count': {'$sum': 1}, 'dups': { '$addToSet': '$_id' } }, }, { '$match': { 'count': { '$gt': 1 } } } ] myclient = pymongo.MongoClient(host='m3005.test.com',port=3005,connect=False) db = myclient.deepnet_test if __name__ == '__main__': map_id = map(lambda doc: doc['dups'][1:], db['xiuxiu_all'].aggregate(pipeline=pipeline,allowDiskUse=True)) list_id = [item for sublist in map_id for item in sublist] print(db['xiuxiu_all'] .bulk_write(list(map(lambda _id: DeleteOne({'_id': _id}), list_id))) .bulk_api_result)
db.xiuxiu_all.find().forEach(function(x){ db.xiuxiu_all_bak.insert(x); })
db.feedImg_all.aggregate( [ {$match:{"createTime": {"$gte": 1541606400, "$lt": 1541692800}}}, // 添加過濾條件 {$project:{"feedId": true}}, {$group:{_id: "$feedId"}}, {$group:{_id: null, count: {$sum:1}}} ], {allowDiskUse: true})