以前的文章《Java分佈式鎖實現》中列舉了分佈式鎖的3種實現方式,分別是基於數據庫實現,基於緩存實現和基於zookeeper實現。三種實現方式各有可取之處,本篇文章就詳細講解一下Java分佈式鎖之基於數據庫的實現方式,也是最簡單最易理解的實現方式。html
首先,先來闡述下「鎖」的概念,鎖做爲一種安全防護工具,既能上鎖防止別人打開,又能讓持有鑰匙的人打開鎖,這是鎖的基本功能。那再來講一下「分佈式鎖」,分佈式鎖是在分佈式系統(多個獨立運行系統)內的鎖,相對來講,這把鎖的安全級別以及做用範圍更大,因此從設計上就要考慮更多東西。java
如今來講,怎麼基於數據庫實現這把分佈式鎖。其實說白了就是,把鎖做爲數據資源存入數據庫,當持有這把鎖的訪問者來決定是否開鎖。git
如下詳細講解了在多個應用服務裏,怎樣用數據庫去實現分佈式鎖。github
結合案例:數據庫
1.客戶app取出交易(同一個客戶在某一個時間點只能對某種資產作取現操做)緩存
2.交易重試補償(交易過程服務宕機,掃描重試補償)安全
數據庫鎖表的表結構以下:併發
field | type | comment |
ID | bigint | 主鍵 |
OUTER_SERIAL_NO | varchar | 流水號 |
CUST_NO | char | 客戶號 |
SOURCE_CODE | varchar | 鎖操做 |
THREAD_NO | varchar | 線程號 |
STATUS | char | 鎖狀態 |
REMARK | varchar | 備註 |
CREATED_AT | timestamp | 建立時間 |
UPDATED_AT | timestamp | 更新時間 |
做爲鎖的必要屬性有5個:系統流水號,客戶號,鎖操做,線程號和鎖狀態,下面來解釋一下每種屬性app
流水號:鎖的具體指向,好比能夠是產品,能夠是交易流水號(後面會說到交易同步鎖、交易補償鎖的使用方式)dom
客戶號:客戶的惟一標識
鎖操做:客戶的某種操做,好比客戶取現操做,取現補償重試操做
線程號:當前操做線程的線程號,好比取當前線程的uuid
鎖狀態:P處理中,F失敗,Y成功
代碼的目錄結構以下:
主要貼一下鎖操做的核心代碼實現:
鎖接口定義:DbLockManager.java
/** * 鎖接口 <br> * * @Author fugaoyang * */ public interface DbLockManager { /** * 加鎖 */ boolean lock(String outerSerialNo, String custNo, LockSource source); /** * 解鎖 */ void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus); }
鎖接口實現類:DbLockManagerImpl.java
/** * * 數據庫鎖實現<br> * * @author fugaoyang * */ @Service public class DbLockManagerImpl implements DbLockManager { private final Logger LOG = LoggerFactory.getLogger(this.getClass()); @Autowired private DbSyncLockMapper lockMapper; @Transactional public boolean lock(String outerSerialNo, String custNo, LockSource source) { boolean isLock = false; TradeSyncLock lock = null; try { lock = lockMapper.find(outerSerialNo, custNo, source.getCode()); if (null == lock) { lock = new TradeSyncLock(); createLock(lock, outerSerialNo, custNo, source); int num = lockMapper.insert(lock); if (num == 1) { isLock = true; } LOG.info(ThreadLogUtils.getLogPrefix() + "加入鎖,客戶號[{}],鎖類型[{}]", custNo, source.getCode()); return isLock; } // 根據交易類型進行加鎖 isLock = switchSynsLock(lock, source); LOG.info(ThreadLogUtils.getLogPrefix() + "更新鎖,客戶號[{}],鎖類型[{}]", custNo, source.getCode()); } catch (Exception e) { LOG.error(ThreadLogUtils.getLogPrefix() + "交易加鎖異常, 客戶號:" + custNo, e); } return isLock; } @Transactional public void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus) { try { TradeSyncLock lock = lockMapper.find(outerSerialNo, custNo, source.getCode()); if (null != lock) { lockMapper.update(lock.getId(), targetStatus.getName(), LockStatus.P.getName(), ThreadLogUtils.getCurrThreadUuid(), ThreadLogUtils.getCurrThreadUuid()); } LOG.info(ThreadLogUtils.getLogPrefix() + "釋放鎖,客戶號[{}],鎖類型[{}]", custNo, source.getCode()); } catch (Exception e) { LOG.error(ThreadLogUtils.getLogPrefix() + "釋放鎖異常, 客戶號:{}", custNo, e); } } /** * 匹配加鎖 */ private boolean switchSynsLock(TradeSyncLock lock, LockSource source) { boolean isLock = false; switch (source) { case WITHDRAW: ; isLock = tradeSynsLock(lock); break; case WITHDRAW_RETRY: ; isLock = retrySynsLock(lock); break; default: ; } return isLock; } /** * 交易同步鎖 */ private boolean tradeSynsLock(TradeSyncLock lock) { // 處理中的不加鎖,即不執行交易操做 if (LockStatus.P.getName().equals(lock.getStatus())) { return false; } int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.S.getName(), ThreadLogUtils.getCurrThreadUuid(), null); if (num == 1) { return true; } return false; } /** * 補償同步鎖 */ private boolean retrySynsLock(TradeSyncLock lock) { // 處理中或處理完成的不加鎖,即不執行補償操做 if (LockStatus.P.getName().equals(lock.getStatus()) || LockStatus.S.getName().equals(lock.getStatus())) { return false; } int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.F.getName(), ThreadLogUtils.getCurrThreadUuid(), null); if (num == 1) { return true; } return false; } private void createLock(TradeSyncLock lock, String outerSerialNo, String custNo, LockSource source) { lock.setOuterSerialNo(outerSerialNo); lock.setCustNo(custNo); lock.setSourceCode(source.getCode()); lock.setThreadNo(ThreadLogUtils.getCurrThreadUuid()); lock.setStatus(LockStatus.P.getName()); lock.setRemark(source.getDesc()); } }
獲取當前線程號以及打印uuid工具類ThreadLogUtils.Java
/** * * 線程處理<br> * * @author fugaoyang * */ public class ThreadLogUtils { private static ThreadLogUtils instance = null; private ThreadLogUtils() { setInstance(this); } // 初始化標誌 private static final Object __noop = new Object(); private static ThreadLocal<Object> __flag = new InheritableThreadLocal<Object>() { @Override protected Object initialValue() { return null; } }; // 當前線程的UUID信息,主要用於打印日誌; private static ThreadLocal<String> currLogUuid = new InheritableThreadLocal<String>() { @Override protected String initialValue() { return UUID.randomUUID().toString()/* .toUpperCase() */; } }; private static ThreadLocal<String> currThreadUuid = new ThreadLocal<String>() { @Override protected String initialValue() { return UUIDGenerator.getUuid(); } }; public static void clear(Boolean isNew) { if (isNew) { currLogUuid.remove(); __flag.remove(); currThreadUuid.remove(); } } public static String getCurrLogUuid() { if (!isInitialized()) { throw new IllegalStateException("TLS未初始化"); } return currLogUuid.get(); } public static String getCurrThreadUuid() { return currThreadUuid.get(); } public static void clearCurrThreadUuid() { currThreadUuid.remove(); } public static String getLogPrefix() { if (!isInitialized()) { return ""; } return "<uuid=" + getCurrLogUuid() + ">"; } private static boolean isInitialized() { return __flag.get() != null; } /** * 初始化上下文,若是已經初始化則返回false,不然返回true<br/> * * @return */ public static boolean initialize() { if (isInitialized()) { return false; } __flag.set(__noop); return true; } private static void setInstance(ThreadLogUtils instance) { ThreadLogUtils.instance = instance; } public static ThreadLogUtils getInstance() { return instance; } }
兩種鎖的實現的大體思路以下:
1.交易同步鎖
當一個客戶在app取現,第一次進入時,會插入一條當前線程,狀態是P,操做是取現的鎖,取現成功後根據當前線程號會更新成功;
當一個客戶同時多個取現操做時,只有一個取現操做會加鎖成功,其它會加鎖失敗;
當一個客戶已經在取現中,這時數據庫已經有一條狀態P的鎖,該客戶同時又作了取現,這個取現動做會嘗試加鎖而退出;
2.交易重試補償鎖
1.當一個客戶取現加鎖成功,因調用第三方支付接口超時時,後臺會對該筆交易從新發起重試打款操做,這時會新加一條當前交易流水號,當前線程號,狀態是P,操做是取現重試的鎖,重試的支付結果是成功的話,更新該條鎖數據爲Y狀態,不然更新該條數據爲F狀態;
2.當重試支付失敗後,再去重試打款時,發現鎖的狀態是F,這時把F更新爲P,繼續重試,根據重試結果更新鎖狀態。
上面實現的是一個最基本的數據庫分佈式鎖,知足的併發量也是基於數據庫所能扛得住的,性能基本能夠知足普通的交易量。
後續能夠優化的部分:
1.當一個用戶同時屢次獲取lock時,由於目前是用的樂觀鎖,只會有一個加鎖成功,能夠優化成加入while(true)循環獲取lock,當失敗次數到達指定次數時退出,當前的操做結束。
2.當鎖表數據量隨着時間增大時,能夠考慮按用戶對鎖表進行分表分庫,以減少數據庫方面的壓力。
3.對鎖的操做能夠抽象出來,做爲抽象實現,好比具體的取現操做只關心取現這個業務實現。
由於時間有限,寫的比較倉促,但願你們有問題能夠提出,相互探討~~
完整示例代碼後續會更新到github。