高可用的MongoDB集羣-實戰篇

1.概述

  最近有同窗和網友私信我,問我MongoDB方面的問題;這裏我整理一篇博客來贅述下MongoDB供你們學習參考,博客的目錄內容以下:html

  • 基本操做
  • CRUD
  • MapReduce

  本篇文章是基於MongoDB集羣(Sharding+Replica Sets)上演示的,故操做的內容都是集羣層面的,因此有些命令和單獨的使用MongoDB庫有異樣。具體集羣搭建能夠參考我寫的《高可用的MongoDB集羣》。java

2.基本操做

  經常使用的 Shell 命令以下所示:mysql

db.help()    # 數據庫幫助
db.collections.help()    # 集合幫助
rs.help()    # help on replica set
show dbs    # 展現數據庫名
show collections    # 展現collections在當前庫
use db_name    # 選擇數據庫

  查看集合基本信息,內容以下所示:sql

#查看幫助  
db.yourColl.help(); 

#查詢當前集合的數據條數  
db.yourColl.count(); 

#查看數據空間大小 
db.userInfo.dataSize(); 

#獲得當前彙集集合所在的
db db.userInfo.getDB(); 

#獲得當前彙集的狀態 
db.userInfo.stats(); 

#獲得彙集集合總大小 
db.userInfo.totalSize(); 

#彙集集合儲存空間大小 
db.userInfo.storageSize(); 

#Shard版本信息  
db.userInfo.getShardVersion() 

#彙集集合重命名,將userInfo重命名爲users
db.userInfo.renameCollection("users"); 
 
#刪除當前彙集集合 
db.userInfo.drop();

3.CRUD

3.1建立

  在集羣中,咱們增長一個 friends 庫,命令以下所示:mongodb

db.runCommand({enablesharding:"friends"});

  在庫新建後,咱們在該庫下建立一個user分片,命令以下:shell

db.runCommand( { shardcollection : "friends. user"});

3.2新增

  在MongoDB中,save和insert都能達到新增的效果。可是這二者是有區別的,在save函數中,若是原來的對象不存在,那他們均可以向collection裏插入數據;若是已經存在,save會調用update更新裏面的記錄,而insert則會忽略操做。數據庫

  另外,在insert中能夠一次性插敘一個列表,而不用遍歷,效率高,save則須要遍歷列表,一個個插入,下面咱們能夠看下兩個函數的原型,經過函數原型咱們能夠看出,對於遠程調用來講,是一次性將整個列表post過來讓MongoDB去處理,效率會高些。框架

  Save函數原型以下所示:dom

  Insert函數原型(部分代碼)以下所示:socket

3.3查詢

3.3.1查詢全部記錄

db. user.find();

  默認每頁顯示20條記錄,當顯示不下的狀況下,能夠用it迭代命令查詢下一頁數據。注意:鍵入it命令不能帶「;」 可是你能夠設置每頁顯示數據的大小,用DBQuery.shellBatchSize= 50;這樣每頁就顯示50條記錄了。

3.3.2查詢去掉後的當前彙集集合中的某列的重複數據

db. user.distinct("name"); 

#會過濾掉name中的相同數據 至關於:
select distict name from user;

3.3.3查詢等於條件數據

db.user.find({"age": 24}); 
#至關於:
select * from user where age = 24;

3.3.4查詢大於條件數據

db.user.find({age: {$gt: 24}}); 

# 至關於:
select * from user where age >24;

3.3.5查詢小於條件數據

db.user.find({age: {$lt: 24}}); 
#至關於:
select * from user where age < 24;

3.3.6查詢大於等於條件數據

db.user.find({age: {$gte: 24}}); 
#至關於:
select * from user where age >= 24;

3.3.7查詢小於等於條件數據

db.user.find({age: {$lte: 24}}); 
#至關於:
select * from user where age <= 24;

3.3.8查詢AND和OR條件數據

  • AND
db.user.find({age: {$gte: 23, $lte: 26}});

#至關於
select * from user where age >=23 and age <= 26;
  • OR

db.user.find({$or: [{age: 22}, {age: 25}]}); 

#至關於:
select * from user where age = 22 or age = 25;

 

3.3.9模糊查詢

db.user.find({name: /mongo/}); 

#至關於%% 
select * from user where name like '%mongo%';

3.3.10開頭匹配

db.user.find({name: /^mongo/}); 
# 與SQL中得like語法相似
select * from user where name like 'mongo%';

3.3.11指定列查詢

db.user.find({}, {name: 1, age: 1}); 

#至關於:
select name, age from user;

  固然name也能夠用true或false,當用ture的狀況下和name:1效果同樣,若是用false就是排除name,顯示name之外的列信息。

3.3.12指定列查詢+條件查詢

db.user.find({age: {$gt: 25}}, {name: 1, age: 1}); 

#至關於:
select name, age from user where age > 25;

 db.user.find({name: 'zhangsan', age: 22});

 #至關於:

 select * from user where name = 'zhangsan' and age = 22;

3.3.13排序

#升序:
db.user.find().sort({age: 1});
#降序:
db.
user.find().sort({age: -1});

3.3.14查詢5條數據

db.user.find().limit(5); 

#至關於:
select * from user limit 5;

3.3.15N條之後數據

db.user.find().skip(10); 

#至關於:
select * from user where id not in ( select * from user limit 5 );

3.3.16在必定區域內查詢記錄

#查詢在5~10之間的數據
db.user.find().limit(10).skip(5);

  可用於分頁,limit是pageSize,skip是第幾頁*pageSize。

3.3.17COUNT

db.user.find({age: {$gte: 25}}).count(); 

#至關於:
select count(*) from user where age >= 20;

3.3.18安裝結果集排序

db.userInfo.find({sex: {$exists: true}}).sort(); 

3.3.19不等於NULL

db.user.find({sex: {$ne: null}}) 

#至關於:
select * from user where sex not null;

3.4索引

  建立索引,並指定主鍵字段,命令內容以下所示:

db.epd_favorites_folder.ensureIndex({"id":1},{"unique":true,"dropDups":true})
db.epd_focus.ensureIndex({"id":1},{"unique":true,"dropDups":true})

3.5更新

  update命令格式,以下所示:

db.collection.update(criteria,objNew,upsert,multi) 

  參數說明: criteria:

  查詢條件 objNew:update對象和一些更新操做符

  upsert:若是不存在update的記錄,是否插入objNew這個新的文檔,true爲插入,默認爲false,不插入。

  multi:默認是false,只更新找到的第一條記錄。若是爲true,把按條件查詢出來的記錄所有更新。

  下面給出一個示例,更新id爲 1 中 price 的值,內容以下所示:

db. user.update({id: 1},{$set:{price:2}});  

#至關於:
update user set price=2 where id=1;

3.6刪除

3.6.1刪除指定記錄

db. user. remove( { id:1 } );  

#至關於:
delete from user where id=1;

3.6.2刪除全部記錄

db. user. remove( { } );  

#至關於:
delete from user;

3.6.3DROP

db. user. drop();  

#至關於:
drop table user;

4.MapReduce

  MongoDB中的 MapReduce 是編寫JavaScript腳本,而後由MongoDB去解析執行對應的腳本,下面給出 Java API 操做MR。代碼以下所示:

  MongdbManager類,用來初始化MongoDB:

package cn.mongo.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.DB;
import com.mongodb.Mongo;
import com.mongodb.MongoOptions;

/**
 * @Date Mar 3, 2015
 * 
 * @author dengjie
 * 
 * @Note mongodb manager
 */
public class MongdbManager {

    private static final Logger logger = LoggerFactory.getLogger(MongdbManager.class);
    private static Mongo mongo = null;
    private static String tag = SystemConfig.getProperty("dev.tag");

    private MongdbManager() {
    }

    static {
        initClient();
    }

    // get DB object
    public static DB getDB(String dbName) {
        return mongo.getDB(dbName);
    }

    // get DB object without param
    public static DB getDB() {
        String dbName = SystemConfig.getProperty(String.format("%s.mongodb.dbname", tag));
        return mongo.getDB(dbName);
    }

    // init mongodb pool
    private static void initClient() {
        try {
            String[] hosts = SystemConfig.getProperty(String.format("%s.mongodb.host", tag)).split(",");
            for (int i = 0; i < hosts.length; i++) {
                try {
                    String host = hosts[i].split(":")[0];
                    int port = Integer.parseInt(hosts[i].split(":")[1]);
                    mongo = new Mongo(host, port);
                    if (mongo.getDatabaseNames().size() > 0) {
                        logger.info(String.format("connection success,host=[%s],port=[%d]", host, port));
                        break;
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                    logger.error(String.format("create connection has error,msg is %s", ex.getMessage()));
                }
            }

            // 設置鏈接池的信息
            MongoOptions opt = mongo.getMongoOptions();
            opt.connectionsPerHost = SystemConfig.getIntProperty(String.format("%s.mongodb.poolsize", tag));// poolsize
            opt.threadsAllowedToBlockForConnectionMultiplier = SystemConfig.getIntProperty(String.format(
                    "%s.mongodb.blocksize", tag));// blocksize
            opt.socketKeepAlive = true;
            opt.autoConnectRetry = true;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  MongoDBFactory類,用來封裝操做業務代碼,具體內容以下所示:

package cn.mongo.util;

import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.diexun.domain.MGDCustomerSchema;

import com.mongodb.BasicDBList;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.util.JSON;

/**
 * @Date Mar 3, 2015
 *
 * @Author dengjie
 */
public class MongoDBFactory {

    private static Logger logger = LoggerFactory.getLogger(MongoDBFactory.class);

    // save data to mongodb
    public static void save(MGDCustomerSchema mgs, String collName) {
        DB db = null;
        try {
            db = MongdbManager.getDB();
            DBCollection coll = db.getCollection(collName);
            DBObject dbo = (DBObject) JSON.parse(mgs.toString());
            coll.insert(dbo);
        } catch (Exception ex) {
            ex.printStackTrace();
            logger.error(String.format("save object to mongodb has error,msg is %s", ex.getMessage()));
        } finally {
            if (db != null) {
                db.requestDone();
                db = null;
            }
        }
    }

    // batch insert
    public static void save(List<?> mgsList, String collName) {
        DB db = null;
        try {
            db = MongdbManager.getDB();
            DBCollection coll = db.getCollection(collName);
            BasicDBList data = (BasicDBList) JSON.parse(mgsList.toString());
            List<DBObject> list = new ArrayList<DBObject>();
            int commitSize = SystemConfig.getIntProperty("mongo.commit.size");
            int rowCount = 0;
            long start = System.currentTimeMillis();
            for (Object dbo : data) {
                rowCount++;
                list.add((DBObject) dbo);
                if (rowCount % commitSize == 0) {
                    try {
                        coll.insert(list);
                        list.clear();
                        logger.info(String.format("current commit rowCount = [%d],commit spent time = [%s]s", rowCount,
                                (System.currentTimeMillis() - start) / 1000.0));
                    } catch (Exception ex) {
                        ex.printStackTrace();
                        logger.error(String.format("batch commit data to mongodb has error,msg is %s", ex.getMessage()));
                    }
                }
            }
            if (rowCount % commitSize != 0) {
                try {
                    coll.insert(list);
                    logger.info(String.format("insert data to mongo has spent total time = [%s]s",
                            (System.currentTimeMillis() - start) / 1000.0));
                } catch (Exception ex) {
                    ex.printStackTrace();
                    logger.error(String.format("commit end has error,msg is %s", ex.getMessage()));
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            logger.error(String.format("save object list to mongodb has error,msg is %s", ex.getMessage()));
        } finally {
            if (db != null) {
                db.requestDone();
                db = null;
            }
        }
    }
}

  LoginerAmountMR類,這是一個統計登陸用戶數的MapReduce計算類,代碼以下:

package cn.mongo.mapreduce;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.bson.BSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.diexun.conf.ConfigureAPI.MR;
import cn.diexun.conf.ConfigureAPI.PRECISION;
import cn.diexun.domain.Kpi;
import cn.diexun.util.CalendarUtil;
import cn.diexun.util.MongdbManager;
import cn.diexun.util.MysqlFactory;

import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MapReduceOutput;
import com.mongodb.ReadPreference;

/**
 * @Date Mar 13, 2015
 * 
 * @Author dengjie
 * 
 * @Note use mr jobs stats user login amount
 */
public class LoginerAmountMR {
    private static Logger logger = LoggerFactory.getLogger(LoginerAmountMR.class);

   // map 函數JS字符串拼接
private static String map() { String map = "function(){"; map += "if(this.userName != \"\"){"; map += "emit({" + "kpi_code:'login_times',username:this.userName," + "district_id:this.districtId,product_style:this.product_style," + "customer_property:this.customer_property},{count:1});"; map += "}"; map += "}"; return map; }
  
private static String reduce() { String reduce = "function(key,values){"; reduce += "var total = 0;"; reduce += "for(var i=0;i<values.length;i++){"; reduce += "total += values[i].count;}"; reduce += "return {count:total};"; reduce += "}"; return reduce; }
  // reduce 函數字符串拼接
public static void main(String[] args) { loginNumbers("t_login_20150312"); } /** * login user amount * * @param collName */ public static void loginNumbers(String collName) { DB db = null; try { db = MongdbManager.getDB(); db.setReadPreference(ReadPreference.secondaryPreferred()); DBCollection coll = db.getCollection(collName); String result = MR.COLLNAME_TMP; long start = System.currentTimeMillis(); MapReduceOutput mapRed = coll.mapReduce(map(), reduce(), result, null); logger.info(String.format("mr run spent time=%ss", (System.currentTimeMillis() - start) / 1000.0)); start = System.currentTimeMillis(); DBCursor cursor = mapRed.getOutputCollection().find(); List<Kpi> list = new ArrayList<Kpi>(); while (cursor.hasNext()) { DBObject obj = cursor.next(); BSONObject key = (BSONObject) obj.get("_id"); BSONObject value = (BSONObject) obj.get("value"); Object kpiValue = value.get("count"); Object userName = key.get("username"); Object districtId = key.get("district_id"); Object customerProperty = key.get("customer_property"); Object productStyle = key.get("product_style"); Kpi kpi = new Kpi(); try { kpi.setUserName(userName == null ? "" : userName.toString()); kpi.setKpiCode(key.get("kpi_code").toString()); kpi.setKpiValue(Math.round(Double.parseDouble(kpiValue.toString()))); kpi.setCustomerProperty(customerProperty == null ? "" : customerProperty.toString()); kpi.setDistrictId(districtId == "" ? 0 : Integer.parseInt(districtId.toString())); kpi.setProductStyle(productStyle == null ? "" : productStyle.toString()); kpi.setCreateDate(collName.split("_")[2]); kpi.setUpdateDate(Timestamp.valueOf(CalendarUtil.formatMap.get(PRECISION.HOUR).format(new Date()))); list.add(kpi); } catch (Exception exx) { exx.printStackTrace(); logger.error(String.format("parse type or get value has error,msg is %s", exx.getMessage())); } } MysqlFactory.insert(list); logger.info(String.format("store mysql spent time is %ss", (System.currentTimeMillis() - start) / 1000.0)); } catch (Exception ex) { ex.printStackTrace(); logger.error(String.format("run map-reduce jobs has error,msg is %s", ex.getMessage())); } finally { if (db != null) { db.requestDone(); db = null; } } } }

5.總結

  在計算 MongoDB 的MapReduce計算的時候,拼接JavaScript字符串時須要謹慎當心,很容易出錯,上面給出的代碼只是一部分代碼,供參考學習使用;另外,如果要作MapReduce任務計算,推薦使用Hadoop的MapReduce計算框架,MongoDB的MapReduce框架這裏僅作介紹學習瞭解。

6.結束語

  這篇博客就和你們分享到這裏,如果你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!

相關文章
相關標籤/搜索