1.mongodb的鏈接類java
import org.apache.log4j.Logger; import com.mongodb.MongoClient; import com.mongodb.client.MongoDatabase; public class MongoHelper { static Logger logger = Logger.getLogger(MongoHelper.class); static final String DBName = "wb"; static final String ServerAddress = "localhost"; static final int PORT = 27017; public MongoHelper(){ } public MongoClient getMongoClient( ){ MongoClient mongoClient = null; try { // 鏈接到 mongodb 服務 mongoClient = new MongoClient(ServerAddress, PORT); logger.info("======================Connect to mongodb successfully"); } catch (Exception e) { System.err.println(e.getClass().getName() + ": " + e.getMessage()); } return mongoClient; } public MongoDatabase getMongoDataBase(MongoClient mongoClient) { MongoDatabase mongoDataBase = null; try { if (mongoClient != null) { // 鏈接到數據庫 mongoDataBase = mongoClient.getDatabase(DBName); logger.info("=================Connect to DataBase successfully"); } else { throw new RuntimeException("MongoClient不可以爲空"); } } catch (Exception e) { e.printStackTrace(); } return mongoDataBase; } public void closeMongoClient(MongoDatabase mongoDataBase,MongoClient mongoClient ) { if (mongoDataBase != null) { mongoDataBase = null; } if (mongoClient != null) { mongoClient.close(); mongoClient=null; } logger.info("==================mongoClient:"+mongoClient); logger.info("===============CloseMongoClient successfully"); } }
2.mongodb的簡單插入算法
(1).每次從庫裏查詢出來100條數據進行處理mongodb
(2).處理數據數據庫
(3).切庫apache
(4).查詢json
(5).插入到另外一個庫app
import java.util.ArrayList; import net.sf.json.JSONObject; import org.apache.log4j.Logger; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.BasicDBObject; import com.mongodb.Block; import com.mongodb.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; public class fanMain { private final static Logger logger = Logger.getLogger(fanMain.class); public static void main(String[] args) { MongoHelper mongoHelper = new MongoHelper(); MongoClient mongoClient = mongoHelper.getMongoClient(); MongoDatabase mongoDatabase = mongoHelper.getMongoDataBase(mongoClient); MongoCollection<Document> collection = null; int id = Integer.valueOf(args[0]); int skip = id; logger.info("===skip:"+skip); try { while (true) { collection = mongoDatabase.getCollection("wbid"); logger.info("=========切入wbid表"); logger.info("====ture skip:"+skip); MongoCursor<Document> mongoCursor = collection.find().skip(skip).limit(100).iterator(); if (mongoCursor.hasNext()) { while (mongoCursor.hasNext()) { Document yh = mongoCursor.next(); String jj=yh.toJson(); JSONObject js=JSONObject.fromObject(jj); String yongh=js.getString("yongh"); logger.info("=============yonghu:"+yongh); try{ System.err.println("===近來try"); MongoCollection<Document> coll = mongoDatabase.getCollection("wbData"); logger.info("=========切入wbData表"); Bson parent = Filters.eq("yongh", yongh); logger.info("===執行了parent"); //返回總數 /*Date d=new Date(); long t1=d.getTime(); System.out.println("t1:"+t1); logger.info("=========t1:"+t1);*/ Long post_count=coll.count(Filters.eq("yongh", yongh)); System.out.println("post_count:"+post_count); logger.info("======post_count:"+post_count); /*long t2=d.getTime(); System.out.println("t2:"+t2); logger.info("======t2:"+t2); long tt=t2-t1; logger.info("===tt:"+tt);*/ logger.info("=====post_count:"+post_count); MongoCursor<Document> m=coll.find(parent).iterator(); /*long t3=d.getTime(); System.out.println("t3:"+t3); logger.info("===========t3:"+t3); long tt1=t3-t2; logger.info("===tt1:"+tt1); logger.info("===查詢了!!!tt1:"+tt1);*/ logger.info("======查詢了!!!"); int count = 0; int fans=0; Document item = m.next(); fans=(int)item.get("nfans");//9c1d5252eccf6493b9a835ba74b979b0 System.out.println(fans); logger.info("===fans:"+fans); BasicDBObject group = new BasicDBObject(); /*while(m.hasNext()){ logger.info("====進入while"); Document item = m.next(); String s=(String)item.get("yongh"); System.out.println(s); fans=(int)item.get("nfans"); System.out.println(fans); logger.info("===fans:"+fans); count++; logger.info("===count:" + count); break; }*/ JSONObject json = new JSONObject(); json.put("yongh", yongh); json.put("post_count", post_count); json.put("fans", fans); System.out.println(json.get("post_count")); System.err.println(json.get("fans")); String obj=json.toString(); System.out.println("====obj:"+obj); System.err.println("====obj:"+obj); MongoCollection<Document> c = mongoDatabase.getCollection("wbfans"); logger.info("===切換到wbfans裏"); Document doc = Document.parse(obj); c.insertOne(doc); logger.info("!!!****************!insert===插入fans成功!!!!"); logger.info("===完成插入skip:"+skip); }catch(Exception e){ e.printStackTrace(); } } logger.info("==束==skip qian:"+skip); logger.error("==束==skip qian:"+skip); System.out.println("==束==skip qian:"+skip); System.err.println("==束==skip qian:"+skip); skip = skip + 100; logger.info("==循環結束=skip變化:"+skip); logger.error("==循環結束=skip變化:"+skip); } else { logger.info("======要退出了!!!"); break; } } } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } finally { mongoHelper.closeMongoClient(mongoDatabase, mongoClient); } } }
3.此方法和2中的效果同樣只是 實現方式不一樣函數
import java.awt.List; import java.util.ArrayList; import java.util.Map; import net.sf.json.JSONObject; import org.apache.log4j.Logger; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.Block; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; public class countFansMain { private final static Logger logger = Logger.getLogger(countFansMain.class); public static void main(String[] args) { MongoHelper mongoHelper = new MongoHelper(); MongoClient mongoClient = mongoHelper.getMongoClient(); MongoDatabase mongoDatabase = mongoHelper.getMongoDataBase(mongoClient); MongoCollection<Document> collection = null; int skip = 0; try { while (true) { collection = mongoDatabase.getCollection("wbid"); logger.info("=========切入wbid表"); FindIterable<Document> findIterable = collection.find() .skip(skip).limit(100); MongoCursor<Document> mongoCursor = findIterable.iterator(); if (mongoCursor.hasNext()) { Document yh = mongoCursor.next(); for (Document document : findIterable) { Object yongh = document.get("yongh"); logger.info("===yonghu:" + yongh); collection = mongoDatabase.getCollection("wbData"); logger.info("=========切入wbData表"); try{ System.err.println("===近來try"); logger.info("===進來xiaotry"); FindIterable<Document> iters = collection .find(new Document("yongh", yongh)); logger.info("======執行了查詢用戶"); System.err.println("======執行了查詢用戶"); MongoCursor<Document> moc = iters.iterator(); logger.info("=====MongoCursor"); System.err.println("======執行了查詢用戶"); int count = 0; Document fans = moc.next(); Object fs = null; for (Document d : iters) { logger.info("=====開始遍歷!!!!"); System.err.println("====開始遍歷!!!!!"); fs= d.get("nfans"); logger.info("====fans:" + fs); count++; logger.info("===count:" + count); } JSONObject json = new JSONObject(); json.put("yongh", yongh); json.put("post_count", count); json.put("fans", fs); System.out.println(json.get("post_count")); System.err.println(json.get("fans")); String obj=json.toString(); System.out.println("====obj:"+obj); System.err.println("====obj:"+obj); collection = mongoDatabase.getCollection("wbfans"); logger.info("===切換到wbfans裏"); Document doc = Document.parse(obj); collection.insertOne(doc); logger.info("!!!****************!insert===插入fans成功!!!!"); }catch(Exception e){ e.printStackTrace(); } } skip = skip + 100; logger.info("====skip變化:"+skip); } else { logger.info("======要退出了!!!"); break; } } } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } finally { mongoHelper.closeMongoClient(mongoDatabase, mongoClient); } } }
4.mongodb的聚合函數應用 ,主要是應用db.wbfans.aggregate([{$group : {_id : {post_count:"$post_count"}, sum_fans: {$sum: "$fans"}}}])post
按照post_count分組 並對fans列求和,對AggregateIterable<Document> iterable = collection.aggregate(pipeline).allowDiskUse(true);中的allowDiskUse(true)解釋i大數據
若是你從事過大數據計算、數據統計等相關工做,應該知道每一個計算任務(job或task)都會使用獨立的有限大小的內存空間,mongodb沒有提供複雜的內存分配模型(任務調度算法),只是簡單的限定每一個stage最多使用100M內存,若是超過此值將終止計算並返回error;爲了支持較大數據集合的處理,咱們能夠指定「allowDiskUse」參數將「溢出」的數據寫入本地的臨時文件中(臨時的collection),這個參數咱們一般須要設定爲true。(參見上述示例)
mport java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; import net.sf.json.JSONObject; import org.apache.log4j.Logger; import org.bson.Document; import com.mongodb.MongoClient; import com.mongodb.client.AggregateIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; public class avgFansMain { private final static Logger logger = Logger.getLogger(avgFansMain.class); public static void main(String[] args) { MongoHelper mongoHelper = new MongoHelper(); MongoClient mongoClient = mongoHelper.getMongoClient(); MongoDatabase mongoDatabase = mongoHelper.getMongoDataBase(mongoClient); MongoCollection<Document> collection = null; /* int id = Integer.valueOf(args[0]); */ int skip = 0; logger.info("===skip:" + skip); try { collection = mongoDatabase.getCollection("wbfans"); logger.info("=========切入wbfans表"); logger.info("====ture skip:" + skip); List<Document> pipeline = Arrays.asList(new Document("$group", new Document("_id", new Document("post_count", "$post_count") ).append("sum_fans", new Document("$sum", "$fans")))); AggregateIterable<Document> iterable = collection.aggregate( pipeline).allowDiskUse(true); int i = 0; for (Document d : iterable) { i++; logger.info("****************第" + i + "條***************"); Document _id = (Document) d.get("_id"); int post_count = _id.getInteger("post_count"); logger.info("====post_count:" + post_count); Long sumPostCount = collection.count(Filters.eq("post_count", post_count)); logger.info("=======sumPostCount:" + sumPostCount); int sum_fans = (int) d.get("sum_fans"); logger.info("======sum_fans:" + sum_fans); double sumFans = (double) sum_fans; double sumPC = (double) sumPostCount; logger.info("=======sumFans:" + sumFans + ",sumPC:" + sumPC); double avgFans = sumFans / sumPC; logger.info("======avgFans:" + avgFans); JSONObject json = new JSONObject(); json.put("post_count_yongh", post_count); json.put("sum_yongh", sumPostCount); json.put("sum_fans", sum_fans); json.put("avg_fans", avgFans); // 插入集合中 MongoCollection<Document> c = mongoDatabase .getCollection("vagFans"); logger.info("===切換到vagFans裏"); Document doc = Document.parse(json.toString()); c.insertOne(doc); logger.info("!!!****************!insert===插入avg_fans成功!!!!"); } } catch (Exception e) { e.printStackTrace(); } finally { mongoHelper.closeMongoClient(mongoDatabase, mongoClient); } } }
推薦https://blog.csdn.net/xundh/article/details/49384393