爲了進行復雜信息的存儲和查詢,服務端系統每每須要數據庫操做。數據庫分爲關係型數據庫和非關係型數據庫,關係型數據庫有MySQL、Oracle、SQL Server等,非關係型數據庫有Redis(經常使用來作緩存)、MongoDB等。MySQL是目前很流行的數據庫,本文將要介紹如何在node服務中進行MySQL數據庫操做。node
npm install mysql --save
或者mysql
yarn add mysql
要想進行數據庫操做就須要和數據庫創建鏈接,而後經過鏈接進行數據庫的操做。MySQL的數據庫鏈接方式有如下幾種:git
mysqljs文檔中推薦使用第一種方式:每次請求創建一個鏈接,可是因爲頻繁的創建、關閉數據庫鏈接,會極大的下降系統的性能,因此我選擇了使用鏈接池的方式,若是對性能有更高的要求,安裝了MySQL 集羣,能夠選擇使用鏈接池集羣。github
將數據庫相關的配置添加到公用的配置文件中,方便項目的初始化。sql
module.exports = { … // mysql數據庫配置 mysql: { // 主機 host: 'localhost', // 端口 port: 3306, // 用戶名 user: 'root', // 密碼 password: '123456', // 數據庫名 database: 'server-demo', // 鏈接池容許建立的最大鏈接數,默認值爲10 connectionLimit: 50, // 容許掛起的最大鏈接數,默認值爲0,表明掛起的鏈接數無限制 queueLimit: 0 } };
connectionLimit 和 queueLimit 是數據鏈接池特有的配置項。數據庫
/** * 數據庫鏈接池 */ const mysql = require('mysql'); const config = require('../config'); // 建立數據庫鏈接池 const pool = mysql.createPool(config.mysql); pool.on('acquire', function (connection) { console.log(`獲取數據庫鏈接 [${connection.threadId}]`); }); pool.on('connection', function (connection) { console.log(`建立數據庫鏈接 [${connection.threadId}]`); }); pool.on('enqueue', function () { console.log('正在等待可用數據庫鏈接'); }); pool.on('release', function (connection) { console.log(`數據庫鏈接 [${connection.threadId}] 已釋放`); }); module.exports = pool;
建立數據庫鏈接池pool後,就能夠經過pool獲取數據庫鏈接了,另外經過監聽鏈接池的事件能夠了解鏈接池中鏈接的使用狀況。
若是將connectionLimit 設爲2,queueLimit 設爲0,當同時有5個請求獲取數據庫鏈接時,線程池的事件日誌以下:npm
正在等待可用數據庫鏈接 正在等待可用數據庫鏈接 正在等待可用數據庫鏈接 建立數據庫鏈接 [1011] 獲取數據庫鏈接 [1011] 數據庫鏈接 [1011] 已釋放 獲取數據庫鏈接 [1011] 建立數據庫鏈接 [1012] 獲取數據庫鏈接 [1012] 數據庫鏈接 [1011] 已釋放 獲取數據庫鏈接 [1011] 數據庫鏈接 [1012] 已釋放 獲取數據庫鏈接 [1012] 數據庫鏈接 [1011] 已釋放 數據庫鏈接 [1012] 已釋放
因爲線程池容許的最大鏈接數是2,5個請求中會有2個請求可以獲得鏈接,另外3個請求掛起等待可用鏈接。因爲建立數據庫鏈接的代價比較大,線程池在建立鏈接時採用懶漢式,也就是,用到時才建立。先獲得鏈接的請求在完成操做後釋放鏈接,放回到鏈接池,而後掛起的請求從線程池取出空閒的鏈接進行操做。數組
因爲mysql 模塊的接口都爲回調方式的,爲了操做方便簡單地將接口封裝爲Promise,相關方法封裝以下:緩存
const pool = require('./pool'); // 獲取鏈接 function getConnection () { return new Promise((resolve, reject) => { pool.getConnection((err, connection) => { if (err) { console.error('獲取數據庫鏈接失敗!', err) reject(err); } else { resolve(connection); } }); }); } // 開始數據庫事務 function beginTransaction (connection) { return new Promise((resolve, reject) => { connection.beginTransaction(err => { if (err) { reject(err); } else { resolve(); } }); }); } // 提交數據庫操做 function commit (connection) { return new Promise((resolve, reject) => { connection.commit(err => { if (err) { reject(err); } else { resolve(); } }); }) } // 回滾數據庫操做 function rollback (connection) { return new Promise((resolve, reject) => { connection.rollback(err => { if (err) { reject(err); } else { resolve(); } }); }) }
對於不須要使用事務的普通操做,獲取數據庫鏈接connection後,使用connection進行數據庫操做,完成後釋放鏈接到鏈接池,則執行完成一次操做。框架
/** * 執行數據庫操做【適用於不須要事務的查詢以及單條的增、刪、改操做】 * 示例: * let func = async function(conn, projectId, memberId) { ... }; * await execute( func, projectId, memberId); * @param func 具體的數據庫操做異步方法(第一個參數必須爲數據庫鏈接對象connection) * @param params func方法的參數(不包含第一個參數 connection) * @returns {Promise.<*>} func方法執行後的返回值 */ async function execute (func, ...params) { let connection = null; try { connection = await getConnection() let result = await func(connection, ...params); return result } finally { connection && connection.release && connection.release(); } }
對於不少業務都須要執行事務操做,例如:銀行轉帳,A帳戶轉帳給B帳戶 100元,這個業務操做須要執行兩步,從A帳戶減去100元,而後給B帳戶增長100元。兩個子操做必須所有執行成功才能完成完整的業務操做,若是任意子操做執行失敗就須要撤銷以前的操做,進行回滾。
對於須要使用事務的操做,獲取數據庫鏈接connection後,首先須要調用connection.beginTransaction() 開始事務,而後使用connection進行多步操做,完成後執行connection.commit() 進行提交,則執行完成一次事務操做。若是在執行過程當中出現了異常,則執行connection.rollback() 進行回滾操做。
/** * 執行數據庫事務操做【適用於增、刪、改多個操做的執行,若是中間數據操做出現異常則以前的數據庫操做所有回滾】 * 示例: * let func = async function(conn) { ... }; * await executeTransaction(func); * @param func 具體的數據庫操做異步方法(第一個參數必須爲數據庫鏈接對象connection) * @returns {Promise.<*>} func方法執行後的返回值 */ async function executeTransaction(func) { const connection = await getConnection(); await beginTransaction(connection); let result = null; try { result = await func(connection); await commit(connection); return result } catch (err) { console.error('事務執行失敗,操做回滾'); await rollback(connection); throw err; } finally { connection && connection.release && connection.release(); } }
增刪改查是處理數據的基本原子操做,將這些操做根據操做的特色進行簡單的封裝。
/** * 查詢操做 * @param connection 鏈接 * @param sql SQL語句 * @param val SQL參數 * @returns {Promise} resolve查詢到的數據數組 */ function query (connection, sql, val) { // console.info('sql執行query操做:\n', sql, '\n', val); return new Promise((resolve, reject) => { connection.query(sql, val, (err, rows) => { if (err) { console.error('sql執行失敗!', sql, '\n', val); reject(err); } else { let results = JSON.parse(JSON.stringify(rows)); resolve(results); } }); }); } /** * 查詢單條數據操做 * @param connection 鏈接 * @param sql SQL語句 * @param val SQL參數 * @returns {Promise} resolve查詢到的數據對象 */ function queryOne (connection, sql, val) { return new Promise((resolve, reject) => { query(connection, sql, val).then( results => { let result = results.length > 0 ? results[0] : null; resolve(result); }, err => reject(err) ) }); } /** * 新增數據操做 * @param connection 鏈接 * @param sql SQL語句 * @param val SQL參數 * @param {boolean} skipId 跳過自動添加ID, false: 自動添加id,true: 不添加id * @returns {Promise} resolve 自動生成的id */ function insert (connection, sql, val, skipId) { let id = val.id; if (!id && !skipId) { id = uuid(); val = {id, ...val}; } return new Promise((resolve, reject) => { // console.info('sql執行insert操做:\n', sql, '\n', val); connection.query(sql, val, (err, results) => { if (err) { console.error('sql執行失敗!', sql, '\n', val); reject(err); } else { resolve(id); } }); }); } /** * 更新操做 * @param connection 鏈接 * @param sql SQL語句 * @param val SQL參數 * @returns {Promise} resolve 更新數據的行數 */ function update (connection, sql, val) { // console.info('sql執行update操做:\n', sql, '\n', val); return new Promise((resolve, reject) => { connection.query(sql, val, (err, results) => { if (err) { console.error('sql執行失敗!', sql, '\n', val); reject(err); } else { resolve(results.affectedRows); } }); }); } /** * 刪除操做 * @param connection 鏈接 * @param sql SQL語句 * @param val SQL參數 * @returns {Promise} resolve 刪除數據的行數 */ function del (connection, sql, val) { // console.info('sql執行delete操做:\n', sql, '\n', val); return new Promise((resolve, reject) => { connection.query(sql, val, (err, results) => { if (err) { console.error('sql執行失敗!', sql, '\n', val); reject(err); } else { // console.log('delete result', results); resolve(results.affectedRows); } }); }); }
將代碼分層能夠下降代碼的耦合度,提升可複用性、可維護性,這裏將代碼分紅了3層:Dao層、Service層和Controller層。
const { query, queryOne, update, insert, del } = require('../db/curd'); class UserDao { static async queryUserById (connection, id) { const sql = `SELECT user.id, user.account, user.name, user.email, user.phone, user.birthday, user.enable, user.deleteFlag, user.creator, user.createTime, user.updater, user.updateTime FROM sys_user user WHERE user.id = ?`; const user = await queryOne(connection, sql, id); return user; } … } module.exports = UserDao;
const { execute, executeTransaction } = require('../db/execute'); const UserDao = require('../dao/userDao'); class UserService { static async findUserById (id) { return await execute(UserDao.queryUserById, id); } … } module.exports = UserService;
對於複雜些的業務邏輯可使用匿名函數來實現:
static async findUserWithRoles (id) { return await execute (async connection => { const user = await UserDao.queryUserById(connection, id); if (user) { user.roles = await RoleDao.queryRolesByUserId(connection, id); } return user; }); }
若是要執行事務操做,則須要使用executeTransaction 方法:
static async updateUserRoleRelations (userId, roleIds) { return await executeTransaction(async connection => { const relations = await UserDao.queryUserRoleRelations(connection, userId); const oldRoleIds = relations.map(item => item.roleId); const newRoleIds = roleIds || []; // 新增的角色數組 const addList = []; // 移除的角色數組 const removeList = []; newRoleIds.forEach(roleId => { if (oldRoleIds.indexOf(roleId) === -1) { addList.push(roleId); } }); oldRoleIds.forEach(roleId => { if (newRoleIds.indexOf(roleId) === -1) { removeList.push(roleId); } }); if (addList.length > 0) { await UserDao.insertUserRoleRelations(connection, userId, addList); } if (removeList.length > 0) { await UserDao.deleteUserRoleRelations(connection, userId, removeList); } }); }
const UserService = require('../service/userService'); class UserControler { static async getUserById (ctx) { // 用戶ID const id = ctx.params.id; // 是否包含用戶角色信息,若是withRoles 爲 "1" 表示須要包含角色信息 const withRoles = ctx.query.withRoles; let user; if (withRoles === '1') { user = await UserService.findUserWithRoles(id); } else { user = await UserService.findUserById(id); } if (user) { ctx.body = user; } else { ctx.body = { code: 1004, msg: '用戶不存在!' } } } … } module.exports = UserControler;
此示例基於Koa框架,controller 層實現完成後須要添加路由:
const router = new KoaRouter(); const UserController = require('./controler/userControler'); // 獲取指定ID的用戶 router.get('/users/:id', UserController.getUserById); // 獲取全部用戶 router.get('/users', UserControler.getUsers);
對於Koa框架如何使用,這裏再也不介紹,路由添加完畢後,啓動服務,便可使用這些接口,若是本地服務啓動的端口爲3000,接口請求地址以下:
本文介紹了mysql模塊的基本使用,對其進行了簡單封裝,並提供了使用示例。除了使用mysql模塊來操做數據庫,也可使用mysql2模塊,mysql2的基本用法與mysql一致,另外mysql2還支持Promise,使用起來更方便。本文相關的代碼已提交到GitHub以供參考,項目地址:https://github.com/liulinsp/node-server-typeorm-demo。
做者:宜信技術學院 劉琳