這篇文章的目的主要是講解TC的在處理分支事務註冊過程當中對全局鎖的處理流程,理解了全局鎖之後才能明白對DB同一個記錄進行屢次變動是如何解決的。git
如上圖所示,問最終全局事務A對資源R1應該回滾到哪一種狀態?很明顯,若是再根據UndoLog去作回滾,就會發生嚴重問題:覆蓋了全局事務B對資源R1的變動。
那Fescar是如何解決這個問題呢?答案就是 Fescar的全局寫排它鎖解決方案,在全局事務A執行過程當中全局事務B會由於獲取不到全局鎖而處於等待狀態。github
public class ConnectionProxy extends AbstractConnectionProxy { public void commit() throws SQLException { if (context.inGlobalTransaction()) { try { // 一、向TC發起註冊操做並檢查是否可以獲取全局鎖 register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e); } try { if (context.hasUndoLog()) { UndoLogManager.flushUndoLogs(this); } // 二、執行本地的事務的commit操做 targetConnection.commit(); } catch (Throwable ex) { report(false); if (ex instanceof SQLException) { throw (SQLException) ex; } else { throw new SQLException(ex); } } report(true); context.reset(); } else { targetConnection.commit(); } } private void register() throws TransactionException { Long branchId = DataSourceManager.get().branchRegister( BranchType.AT, getDataSourceProxy().getResourceId(), null, context.getXid(), context.buildLockKeys()); context.setBranchId(branchId); } }
說明:session
public class DefaultCore implements Core { protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException { response.setTransactionId(request.getTransactionId()); response.setBranchId( core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(), XID.generateXID(request.getTransactionId()), request.getLockKey())); } public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String lockKeys) throws TransactionException { GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin); BranchSession branchSession = new BranchSession(); branchSession.setTransactionId(XID.getTransactionId(xid)); branchSession.setBranchId(UUIDGenerator.generateUUID()); branchSession.setApplicationId(globalSession.getApplicationId()); branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup()); branchSession.setBranchType(branchType); branchSession.setResourceId(resourceId); branchSession.setLockKey(lockKeys); branchSession.setClientId(clientId); // 判斷branchSession是否可以獲取鎖 if (!branchSession.lock()) { throw new TransactionException(LockKeyConflict); } try { globalSession.addBranch(branchSession); } catch (RuntimeException ex) { throw new TransactionException(FailedToAddBranch); } return branchSession.getBranchId(); } public boolean lock() throws TransactionException { return LockManagerFactory.get().acquireLock(this); } }
說明:數據結構
public class DefaultLockManagerImpl implements LockManager { public boolean acquireLock(BranchSession branchSession) throws TransactionException { String resourceId = branchSession.getResourceId(); long transactionId = branchSession.getTransactionId(); //一、根據resourceId去LOCK_MAP獲取,獲取失敗則新增一個空的對象。 ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>> dbLockMap = LOCK_MAP.get(resourceId); if (dbLockMap == null) { LOCK_MAP.putIfAbsent(resourceId, new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>()); dbLockMap = LOCK_MAP.get(resourceId); } ConcurrentHashMap<Map<String, Long>, Set<String>> bucketHolder = branchSession.getLockHolder(); // 二、獲取branchSession的全局鎖的key對象 String lockKey = branchSession.getLockKey(); if(StringUtils.isEmpty(lockKey)) { return true; } // 三、按照分號「;」切割多個LockKey,每一個LockKey按照table:pk1;pk2;pk3格式組裝。 String[] tableGroupedLockKeys = lockKey.split(";"); for (String tableGroupedLockKey : tableGroupedLockKeys) { int idx = tableGroupedLockKey.indexOf(":"); if (idx < 0) { branchSession.unlock(); throw new ShouldNeverHappenException("Wrong format of LOCK KEYS: " + branchSession.getLockKey()); } // 四、分割獲取branchRegister請求的表名和pks。 String tableName = tableGroupedLockKey.substring(0, idx); String mergedPKs = tableGroupedLockKey.substring(idx + 1); // 五、獲取表下的已經加鎖的記錄tableLockMap ConcurrentHashMap<Integer, Map<String, Long>> tableLockMap = dbLockMap.get(tableName); if (tableLockMap == null) { dbLockMap.putIfAbsent(tableName, new ConcurrentHashMap<Integer, Map<String, Long>>()); tableLockMap = dbLockMap.get(tableName); } // 六、遍歷該表全部pks判斷是否已加鎖。 String[] pks = mergedPKs.split(","); for (String pk : pks) { // 七、同一個表的pk按照hash值進行hash分配到tableLockMap當中。 int bucketId = pk.hashCode() % BUCKET_PER_TABLE; Map<String, Long> bucketLockMap = tableLockMap.get(bucketId); if (bucketLockMap == null) { tableLockMap.putIfAbsent(bucketId, new HashMap<String, Long>()); bucketLockMap = tableLockMap.get(bucketId); } // 八、根據pk去獲取bucketLockMap當中獲取鎖對象。 synchronized (bucketLockMap) { Long lockingTransactionId = bucketLockMap.get(pk); if (lockingTransactionId == null) { // No existing lock // 九、將鎖添加到branchSession當中 bucketLockMap.put(pk, transactionId); Set<String> keysInHolder = bucketHolder.get(bucketLockMap); if (keysInHolder == null) { bucketHolder.putIfAbsent(bucketLockMap, new ConcurrentSet<String>()); keysInHolder = bucketHolder.get(bucketLockMap); } keysInHolder.add(pk); } else if (lockingTransactionId.longValue() == transactionId) { // Locked by me continue; } else { // 直接返回異常 LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + lockingTransactionId); branchSession.unlock(); // Release all acquired locks. return false; } } } } return true; } }
說明:app
private static final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>> LOCK_MAP = new ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>>();
說明:ui
- Github:https://github.com/alibaba/fescarthis
- 官方中文介紹:https://github.com/alibaba/fescar/wikispa