mgo:是MongoDB的Go語言驅動,它用基於Go語法的簡單API實現了豐富的特性,並通過良好測試。使用起來很順手,文檔足夠,前期一直在使用,惋惜是不維護了;node
mongo-go-driver:官方的驅動,設計的很底層,從mgo轉來的時候不是很順手,主要是使用事務;git
mgogithub
import ( "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" ) session, err := mgo.Dial("127.0.0.1:27017")
mongo-go-drivermongodb
import ( "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://localhost:27017"))
二者在數據庫的鏈接上都很簡單,後者使用了options,能夠設置鏈接數,鏈接時間,socket時間,超時時間等;數據庫
mgo數組
func createUniqueIndex(collection string, keys ...string) { ms, c := Connect(setting.DatabaseSetting.DBName, collection) defer ms.Close() // 設置統計表惟一索引 index := mgo.Index{ Key: keys, // 索引鍵 Unique: true, // 惟一索引 DropDups: true, // 存在數據後建立, 則自動刪除重複數據 Background: true, // 不長時間佔用寫鎖 } // 建立索引 err := c.EnsureIndex(index) if err != nil { Logger.Error("EnsureIndex error", zap.String("error", err.Error())) } }
mongo-go-driversession
func createUniqueIndex(collection string, keys ...string) { db := DB.Mongo.Database(setting.DatabaseSetting.DBName).Collection(collection) opts := options.CreateIndexes().SetMaxTime(10 * time.Second) indexView := db.Indexes() keysDoc := bsonx.Doc{} // 複合索引 for _, key := range keys { if strings.HasPrefix(key, "-") { keysDoc = keysDoc.Append(strings.TrimLeft(key, "-"), bsonx.Int32(-1)) } else { keysDoc = keysDoc.Append(key, bsonx.Int32(1)) } } // 建立索引 result, err := indexView.CreateOne( context.Background(), mongo.IndexModel{ Keys: keysDoc, Options: options.Index().SetUnique(true), }, opts, ) if result == "" || err != nil { Logger.Error("EnsureIndex error", zap.String("error", err.Error())) } }
mgo能夠直接構建複合索引,按順序傳入多個參數就能夠createIndex(addrTrxC, "address_id", "asset_id")
,可是mongo-go-driver,須要本身作一下處理app
mgosocket
func FindProNode() ([]DBNode, error) { var nodes []DBNode ms, c := Connect(setting.DatabaseSetting.DBName, superNodeC) defer ms.Close() err := c.Find(M{}).Sort("-vote_count").Limit(10).All(&nodes) if err != nil { return nil, err } return nodes, nil }
mongo-go-driver測試
func FindNodes() ([]DBNode, error) { var nodes []DBNode c := Connect(setting.DatabaseSetting.DBName, superNodeC) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() opts := options.Find().SetSort(bsonx.Doc{{"vote_count", bsonx.Int32(-1)}}) cursor, err := c.Find(ctx, M{}, opts) if err != nil { return nil, err } for cursor.Next(context.Background()) { var node DBNode if err = cursor.Decode(&node); err != nil { return nil, err } else { nodes = append(nodes, node) } } return nodes, nil }
在查詢單個元素的時候,兩個驅動都都方便,可是,當查詢多個元素的時候,mongo-go-driver不能直接解析到數組,須要藉助cursor這個類型,遍歷解析全部的數據(麻煩,須要本身封裝)。
mgo
// 通用 func Insert(db, collection string, docs ...interface{}) error { ms, c := Connect(db, collection) defer ms.Close() return c.Insert(docs...) } //插入 func InsertNode(node DBNode) error { err := Insert(setting.DatabaseSetting.DBName, superNodeC, node) return err }
mongo-go-driver
func Insert(db, collection string, docs ...interface{}) (*mongo.InsertManyResult, error) { c := Connect(db, collection) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() return c.InsertMany(ctx, docs) } func InsertNode(node DBNode) error { _, err := Insert(setting.DatabaseSetting.DBName, superNodeC, node) return err }
插入的區別也不是很大
mgo
func UpsertNode(node DBNode) (*mgo.ChangeInfo, error) { findM := M{"pub_key": node.PubKey} ms, c := Connect(setting.DatabaseSetting.DBName, superNodeC) defer ms.Close() updateM := bson.M{"$inc": M{"vote_count": node.VoteCount}, "$set": M{ "name": node.Name, "address": node.Address, "first_timestamp": node.FirstTimestamp}} return c.Upsert(findM, updateM) }
mongo-go-driver
func Update(db, collection string, query, update interface{}) (*mongo.UpdateResult, error) { c := Connect(db, collection) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() opts := options.Update().SetUpsert(true) return c.UpdateOne(ctx, query, update,opts) }
二者均可以更新一個mgo(update) mongo-go-driver(updateOne),和更新多個mgo(updateAll) mongo-go-driver(updateMany);可是在upsert的時候,mgo直接使用upsert就能夠,mongo-go-driver須要設置opts := options.Update().SetUpsert(true)
mgo
func Remove(db, collection string, query interface{}) error { ms, c := Connect(db, collection) defer ms.Close() return c.Remove(query) } func RemoveNode(pubKey string) error { findM := M{"pub_key": pubKey} err := Remove(setting.DatabaseSetting.DBName, superNodeC, findM) return err }
mongo-go-driver
func Remove(db, collection string, query interface{}) (*mongo.DeleteResult, error) { c := Connect(db, collection) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() return c.DeleteOne(ctx, query) } func RemoveNode(pubKey string) error { findM := M{"pub_key": pubKey} _, err := Remove(setting.DatabaseSetting.DBName, superNodeC, findM) return err }
刪除也比較簡單,大體相同
mongo-go-driver
全部的修改以前須要查詢的,請都使用SessionContext(即都使用事務)
由於多處使用,因此封裝了一個方法; 在這個方法中須要實現的方法是Exec的operator type DBTransaction struct { Commit func(mongo.SessionContext) error Run func(mongo.SessionContext, func(mongo.SessionContext, DBTransaction) error) error Logger *logging.Logger } func NewDBTransaction(logger *logging.Logger) *DBTransaction { var dbTransaction = &DBTransaction{} dbTransaction.SetLogger(logger) dbTransaction.SetRun() dbTransaction.SetCommit() return dbTransaction } func (d *DBTransaction) SetCommit() { d.Commit = func(sctx mongo.SessionContext) error { err := sctx.CommitTransaction(sctx) switch e := err.(type) { case nil: d.Logger.Info("Transaction committed.") return nil default: d.Logger.Error("Error during commit...") return e } } } func (d *DBTransaction) SetRun() { d.Run = func(sctx mongo.SessionContext, txnFn func(mongo.SessionContext, DBTransaction) error) error { err := txnFn(sctx, *d) // Performs transaction. if err == nil { return nil } d.Logger.Error("Transaction aborted. Caught exception during transaction.", zap.String("error", err.Error())) return err } } func (d *DBTransaction) SetLogger(logger *logging.Logger) { d.Logger = logger } func (d *DBTransaction) Exec(mongoClient *mongo.Client, operator func(mongo.SessionContext, DBTransaction) error) error { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute) defer cancel() return mongoClient.UseSessionWithOptions( ctx, options.Session().SetDefaultReadPreference(readpref.Primary()), func(sctx mongo.SessionContext) error { return d.Run(sctx, operator) }, ) } //具體調用 func SyncBlockData(node models.DBNode) error { dbTransaction := db_session_service.NewDBTransaction(Logger) // Updates two collections in a transaction. updateEmployeeInfo := func(sctx mongo.SessionContext, d db_session_service.DBTransaction) error { err := sctx.StartTransaction(options.Transaction(). SetReadConcern(readconcern.Snapshot()). SetWriteConcern(writeconcern.New(writeconcern.WMajority())), ) if err != nil { return err } err = models.InsertNodeWithSession(sctx, node) if err != nil { _ = sctx.AbortTransaction(sctx) d.Logger.Info("caught exception during transaction, aborting.") return err } return d.Commit(sctx) } return dbTransaction.Exec(models.DB.Mongo, updateEmployeeInfo) }