scala 操做mongo批量插入和刪除的幾種方式

val uri=MongoClientURI("mongodb://ip:port/db.collection")
val mongoClient=MongoClient(uri)
val coll = mongoClient(uri.database.get)(uri.collection.get)
val query=new BasicDBObject
val removelist=new BasicDBList
val builder=coll.initializeUnorderedBulkOperation
//保證_id惟一 resultdata 爲spark中PairRDD,可換爲其餘   
val outtomongo=resultdata.groupByKey().map{
  case(id,it)=>(id,it.toList(0))}.collect.map{
  case(id,value)=>
    val bson = new BasicDBObject()
    bson.put("_id", id)
    bson.put("value", value)
    removelist.add(id)
    builder.insert(bson)
    bson
}
query.put("_id",new BasicDBObject("$in",removelist))
//批量刪除
coll.remove(query)
//批量插入1
builder.execute()
//批量插入2
coll.insert(outtomongo: _*)



jar包 
mongo-hadoop-core-1.3.0.jar  mongo-java-driver-3.2.0.jar
相關文章
相關標籤/搜索