MongoDB 事務 —— 多文檔事務實踐篇

MongoDB 在單文檔操做中具備原子性,在多文檔操做中就再也不具備此特性,一般須要藉助事務來實現 ACID 特性。html

事務 API 介紹

客戶端對於事務的操做,都由 MongoDB Client Driver 實現提供相應的 API 接口。MongoDB 4.0 以後才支持事務,對於客戶端驅動版本也要選擇相對應版本。node

本文采用 MongoDB Client Driver 3.5 版本git

會話 Session

Session 是 MongoDB 3.6 以後引入的概念,在之前的版本中,Mongod 進程中的每個請求會建立一個上下文(OperationContext),能夠理解爲一個單行事務,這個單行事務中對於數據、索引、oplog 的修改都是原子性的github

MongoDB 3.6 以後的 Session 本質上也是一個上下文,在這個 Session 會話中多個請求共享一個上下文,爲多文檔事務實現提供了基礎。mongodb

一個知識點:爲什麼 db.coll.count() 在宕機崩潰後常常就不許了?api

緣由在於 表記錄數的更新獨立於數據更新的事務以外,參考文章 mongoing.com/archives/54…bash

事務函數

  • startTransaction()

開啓一個新的事務,以後便可進行 CRUD 操做。session

  • commitTransaction()

提交事務保存數據,在提交以前事務中的變動的數據對外是不可見的。dom

  • abortTransaction()

事務回滾,例如,一部分數據更新失敗,對已修改過的數據也進行回滾。async

  • endSession()

結束本次會話。

Mongo Shell 中簡單實現

var session = db.getMongo().startSession();
session.startTransaction({readConcern: { level: 'majority' },writeConcern: { w: 'majority' }});
var coll = session.getDatabase('test').getCollection('user');

coll.update({name: 'Jack'}, {$set: {age: 18}})

// 成功提交事務
session.commitTransaction();

// 失敗事務回滾
session.abortTransaction();
複製代碼

MongoDB 事務在 Nodejs 中的實踐

爲了更好的理解 MongoDB 事務在 Node.js 中如何應用,列舉一個例子進行說明。

假設咱們如今有這樣一個商城商品下單場景,分爲一個商品表(存儲商品數據、庫存信息),另外一個訂單表(存儲訂單記錄)。每次下單以前須要先校驗庫存是否大於 0,大於 0 的時候扣減商品庫存、建立訂單,不然,提示庫存不足沒法下單。

數據模型

// goods
{
    "_id": ObjectId("5e3b839ec2d95bfeecaad6b8"),
    "goodId":"g1000", // 商品 Id
    "name":"測試商品1", // 商品名稱
    "stock":2, // 商品庫存
    "price":100 // 商品金額
}
// db.goods.insert({ "goodId" : "g1000", "name" : "測試商品1", "stock" : 2, "price" : 100 })
複製代碼
// order_goods
{
    "_id":ObjectId("5e3b8401c2d95bfeecaad6b9"),
    "id":"o10000", // 訂單id
    "goodId":"g1000", // 訂單對應的商品 Id
    "price":100 // 訂單金額
}
// db.order_goods.insert({ id: "o10000", goodId: "g1000", price: 100 })
複製代碼

Node.js 操做 MongoDB 原生 API 實現

注意:在一個事務操做中 readPreference 必須設置爲 primary 節點,不能是 secondary 節點。

db.js

連接 MongoDB,初始化一個實例。

const MongoClient = require('mongodb').MongoClient;
const dbConnectionUrl = 'mongodb://192.168.6.131:27017,192.168.6.131:27018,192.168.6.131:27019/?replicaSet=May&readPreference=secondaryPreferred';
const client = new MongoClient(dbConnectionUrl, {
  useUnifiedTopology: true,
});

let instance = null;

module.exports = {
  dbInstance: async () => {
    if (instance) {
      return instance;
    }

    try {
      instance = await client.connect();
    } catch(err) {
      console.log(`[MongoDB connection] ERROR: ${err}`);
      throw err;
    }

    process.on('exit', () => {
      instance.close();
    });

    return instance;
  }
};
複製代碼

index.js

const db = require('./db');

const testTransaction = async (goodId) => {
  const client = await db.dbInstance();
  const transactionOptions = {
    readConcern: { level: 'majority' },
    writeConcern: { w: 'majority' },
    readPreference: 'primary',
  };

  const session = client.startSession();
  console.log('事務狀態:', session.transaction.state);

  try {
    session.startTransaction(transactionOptions);
    console.log('事務狀態:', session.transaction.state);

    const goodsColl = await client.db('test').collection('goods');
    const orderGoodsColl = await client.db('test').collection('order_goods');
    const { stock, price } = await goodsColl.findOne({ goodId }, { session });
    
    console.log('事務狀態:', session.transaction.state);
    
    if (stock <= 0) {
        throw new Error('庫存不足');
    }

    await goodsColl.updateOne({ goodId }, {
        $inc: { stock: -1 } // 庫存減 1
    })
    await orderGoodsColl.insertOne({ id: Math.floor(Math.random() * 1000),  goodId, price  }, { session });
    await session.commitTransaction();
  } catch(err) {
    console.log(`[MongoDB transaction] ERROR: ${err}`);
    await session.abortTransaction();
  } finally {
    await session.endSession();
    console.log('事務狀態:', session.transaction.state);
  }
}

testTransaction('g1000')
複製代碼

運行測試

每一次事務函數執行以後,查看當前事務狀態。

node index
事務狀態: NO_TRANSACTION
事務狀態: STARTING_TRANSACTION
事務狀態: TRANSACTION_IN_PROGRESS
事務狀態: TRANSACTION_COMMITTED
複製代碼
相關文章
相關標籤/搜索