val uri=MongoClientURI("mongodb://ip:port/db.collection") val mongoClient=MongoClient(uri) val coll = mongoClient(uri.database.get)(uri.collection.get) val query=new BasicDBObject val removelist=new BasicDBList val builder=coll.initializeUnorderedBulkOperation //保證_id惟一 resultdata 爲spark中PairRDD,可換爲其餘 val outtomongo=resultdata.groupByKey().map{ case(id,it)=>(id,it.toList(0))}.collect.map{ case(id,value)=> val bson = new BasicDBObject() bson.put("_id", id) bson.put("value", value) removelist.add(id) builder.insert(bson) bson } query.put("_id",new BasicDBObject("$in",removelist)) //批量刪除 coll.remove(query) //批量插入1 builder.execute() //批量插入2 coll.insert(outtomongo: _*) jar包 mongo-hadoop-core-1.3.0.jar mongo-java-driver-3.2.0.jar