MongoDB線程安全批量處理

Mongo批處理工具類:java

package com.saike.solr.server.util;

import java.net.UnknownHostException;
import java.util.ArrayList;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoException;
import com.mongodb.MongoOptions;

/**
 * 批處理工具類
 * @author xieyong
 *
 */
public class UtileMongDB {
    
    UtilThreadLocal<ArrayList<DBObject>> localBatch;
    /**mongo單例對象  根據官方文檔mongojava是線程安全的*/
    private static Mongo mongo;
    private static DBCollection coll;
    //private static Log log = LogFactory.getLog(UtileMongDB.class);
    private static DB db;
    
    static{
           /** 實例化db*/
           MongoOptions options = new MongoOptions();
                      options.autoConnectRetry = true;
                      options.connectionsPerHost = 1000;
                      options.maxWaitTime = 5000;
                      options.socketTimeout = 0;
                      options.connectTimeout = 15000;
                      options.threadsAllowedToBlockForConnectionMultiplier = 5000;
            try {
                mongo = new Mongo(MongoDBConstant.MONGO_HOST,MongoDBConstant.MONGO_PORT);
            } catch (UnknownHostException | MongoException e) {
                e.printStackTrace();
            }
            // boolean auth = db.authenticate(myUserName, myPassword);
    }
    
    public UtileMongDB(){
        try {
            localBatch = new UtilThreadLocal<ArrayList<DBObject>>(ArrayList.class);
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
    /**
     * 返回db對象
     * @return db
     */
    public static DB getDB(){
        if(db==null){
            db = mongo.getDB(MongoDBConstant.MONGO_DB);
        }
        return db;
    }
    
    /**
     * 返回mongo
     * @return mongo鏈接池
     */
    public static Mongo getMong(){
        return mongo;
    }
    
    /**
     * 讀取集合
     * @return mongo集合
     * */
    public static DBCollection getColl(String collname){
        return getDB().getCollection(collname);
    }
    
    public static DBCollection getColl(){
        return getDB().getCollection(MongoDBConstant.MONGO_COLLECTION);
    }
    
    /**  crud操做 */
    public void addBatch(String key,String value){
        BasicDBObject basicDB = new BasicDBObject();
        basicDB.put(key, value);
        /** 這裏用線程本地變量,不用會存在競技條件*/
        localBatch.newGet().add(basicDB);
    }
    
    /**
     * 執行批處理
     * */
    public void executeInsertBatch(){
        getColl().insert(localBatch.get());
        localBatch.get().clear();
    }
    /**
     * 執行批量刪除
     */
    public void executeDeleteBatch(){
        ArrayList<DBObject> array = localBatch.get();    
        for(DBObject obj:array){
            getColl().remove(obj);
        }
        localBatch.get().clear();
    }
    
    
    
    
    
    public DBCursor query(String key,String value){
        BasicDBObject basicDBObject = new BasicDBObject();
        basicDBObject.put(key,value);
        return getColl().find(basicDBObject);
    }
        
}

 

 

ThreadLocal的封裝:mongodb

package com.saike.solr.server.util;

import java.lang.reflect.Constructor;

/**
 * 
 * @author xieyong
 *
 * @param <T> 本地線程變量對象了類型
 */
public class UtilThreadLocal<T> extends ThreadLocal<T> {
    /**參數集合*/
    Object[] obj;
    /**實例化構造函數*/
    Constructor<T> construct;
    
    /**
     * 
     * @param clazz        本地變量的class
     * @param args        構造函數的參數
     * @throws NoSuchMethodException
     * @throws SecurityException
     */
    public UtilThreadLocal(Class clazz,Object... args) throws NoSuchMethodException, SecurityException{
        this.obj = obj;
        Class[] clazzs = null;
        /** new 獲取參數class供獲取構造函數用*/
        if(args != null)
            if(args.length !=0){
                clazzs = new Class[args.length];
                for(int i = 0;i<args.length;i++){
                    clazzs[i] = args[i].getClass();
                }
            }
        this.construct = clazz.getConstructor(clazzs);
    }
    
    /**
     * 若是當前線程沒有對象建立一個新對象
     * @return
     */
    public T newGet(){
        T tar = super.get() ;
        if(tar == null){
            try {
                tar = construct.newInstance(obj);
                super.set(tar);
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        return tar;
    }
}
相關文章
相關標籤/搜索