Fescar 全局鎖介紹

開篇

 這篇文章的目的主要是講解TC的在處理分支事務註冊過程當中對全局鎖的處理流程,理解了全局鎖之後才能明白對DB同一個記錄進行屢次變動是如何解決的。git

如上圖所示,問最終全局事務A對資源R1應該回滾到哪一種狀態?很明顯,若是再根據UndoLog去作回滾,就會發生嚴重問題:覆蓋了全局事務B對資源R1的變動。
 那Fescar是如何解決這個問題呢?答案就是 Fescar的全局寫排它鎖解決方案,在全局事務A執行過程當中全局事務B會由於獲取不到全局鎖而處於等待狀態。github

Fescar 全局鎖處理流程

RM 嘗試獲取全局鎖

public class ConnectionProxy extends AbstractConnectionProxy {
    public void commit() throws SQLException {
        if (context.inGlobalTransaction()) {
            try {
                // 一、向TC發起註冊操做並檢查是否可以獲取全局鎖
                register();
            } catch (TransactionException e) {
                recognizeLockKeyConflictException(e);
            }

            try {
                if (context.hasUndoLog()) {
                    UndoLogManager.flushUndoLogs(this);
                }
                // 二、執行本地的事務的commit操做
                targetConnection.commit();
            } catch (Throwable ex) {
                report(false);
                if (ex instanceof SQLException) {
                    throw (SQLException) ex;
                } else {
                    throw new SQLException(ex);
                }
            }
            report(true);
            context.reset();

        } else {
            targetConnection.commit();
        }
    }

    private void register() throws TransactionException {
        Long branchId = DataSourceManager.get().branchRegister(
                BranchType.AT, getDataSourceProxy().getResourceId(),
                null, context.getXid(), context.buildLockKeys());
        context.setBranchId(branchId);
    }
}

說明:session

  • RM 執行本地事務提交操做在ConnectionProxy的commit()完成。
  • commit()的過程中按照register->commit的流程執行。
  • register()過程RM會向TC發起註冊請求判斷是否可以獲取全局鎖
  • register()經過DataSourceManager的branchRegister()操做完成。

 

TC處理全局鎖申請流程

public class DefaultCore implements Core {

    protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
                                    RpcContext rpcContext) throws TransactionException {
        response.setTransactionId(request.getTransactionId());
        response.setBranchId(
            core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
                XID.generateXID(request.getTransactionId()), request.getLockKey()));

    }

    public Long branchRegister(BranchType branchType, String resourceId, 
                 String clientId, String xid, String lockKeys) throws TransactionException {
        GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);

        BranchSession branchSession = new BranchSession();
        branchSession.setTransactionId(XID.getTransactionId(xid));
        branchSession.setBranchId(UUIDGenerator.generateUUID());
        branchSession.setApplicationId(globalSession.getApplicationId());
        branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup());
        branchSession.setBranchType(branchType);
        branchSession.setResourceId(resourceId);
        branchSession.setLockKey(lockKeys);
        branchSession.setClientId(clientId);

        // 判斷branchSession是否可以獲取鎖
        if (!branchSession.lock()) {
            throw new TransactionException(LockKeyConflict);
        }
        try {
            globalSession.addBranch(branchSession);
        } catch (RuntimeException ex) {
            throw new TransactionException(FailedToAddBranch);

        }
        return branchSession.getBranchId();
    }

    public boolean lock() throws TransactionException {
        return LockManagerFactory.get().acquireLock(this);
    }

}

說明:數據結構

  • TC 在處理branchRegister()的過程當中會判斷branchResiter請求攜帶的session信息可否獲取全局鎖。
  • branchSession.lock()判斷可否獲取鎖,若是獲取失敗則拋出TransactionException(LockKeyConflict)異常。
  • branchSession.lock()判斷可否獲取鎖,若是獲取成功則將branchSession添加到全局鎖當中。

 

TC 判斷全局鎖分配流程

public class DefaultLockManagerImpl implements LockManager {

    public boolean acquireLock(BranchSession branchSession) throws TransactionException {
        String resourceId = branchSession.getResourceId();
        long transactionId = branchSession.getTransactionId();
        //一、根據resourceId去LOCK_MAP獲取,獲取失敗則新增一個空的對象。
        ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>> dbLockMap = LOCK_MAP.get(resourceId);
        if (dbLockMap == null) {
            LOCK_MAP.putIfAbsent(resourceId, new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>());
            dbLockMap = LOCK_MAP.get(resourceId);
        }


        ConcurrentHashMap<Map<String, Long>, Set<String>> bucketHolder = branchSession.getLockHolder();
        
        // 二、獲取branchSession的全局鎖的key對象
        String lockKey = branchSession.getLockKey();
        if(StringUtils.isEmpty(lockKey)) {
            return true;
        }
        
        // 三、按照分號「;」切割多個LockKey,每一個LockKey按照table:pk1;pk2;pk3格式組裝。
        String[] tableGroupedLockKeys = lockKey.split(";");
        for (String tableGroupedLockKey : tableGroupedLockKeys) {
            int idx = tableGroupedLockKey.indexOf(":");
            if (idx < 0) {
                branchSession.unlock();
                throw new ShouldNeverHappenException("Wrong format of LOCK KEYS: " + branchSession.getLockKey());
            }
            // 四、分割獲取branchRegister請求的表名和pks。
            String tableName = tableGroupedLockKey.substring(0, idx);
            String mergedPKs = tableGroupedLockKey.substring(idx + 1);
            // 五、獲取表下的已經加鎖的記錄tableLockMap 
            ConcurrentHashMap<Integer, Map<String, Long>> tableLockMap = dbLockMap.get(tableName);
            if (tableLockMap == null) {
                dbLockMap.putIfAbsent(tableName, new ConcurrentHashMap<Integer, Map<String, Long>>());
                tableLockMap = dbLockMap.get(tableName);
            }
            // 六、遍歷該表全部pks判斷是否已加鎖。
            String[] pks = mergedPKs.split(",");
            for (String pk : pks) {
                // 七、同一個表的pk按照hash值進行hash分配到tableLockMap當中。
                int bucketId = pk.hashCode() % BUCKET_PER_TABLE;
                Map<String, Long> bucketLockMap = tableLockMap.get(bucketId);
                if (bucketLockMap == null) {
                    tableLockMap.putIfAbsent(bucketId, new HashMap<String, Long>());
                    bucketLockMap = tableLockMap.get(bucketId);
                }
                // 八、根據pk去獲取bucketLockMap當中獲取鎖對象。
                synchronized (bucketLockMap) {
                    Long lockingTransactionId = bucketLockMap.get(pk);
                    if (lockingTransactionId == null) {
                        // No existing lock
                        // 九、將鎖添加到branchSession當中
                        bucketLockMap.put(pk, transactionId);
                        Set<String> keysInHolder = bucketHolder.get(bucketLockMap);
                        if (keysInHolder == null) {
                            bucketHolder.putIfAbsent(bucketLockMap, new ConcurrentSet<String>());
                            keysInHolder = bucketHolder.get(bucketLockMap);
                        }
                        keysInHolder.add(pk);

                    } else if (lockingTransactionId.longValue() == transactionId) {
                        // Locked by me
                        continue;
                    } else {
                        // 直接返回異常
                        LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + lockingTransactionId);
                        branchSession.unlock(); // Release all acquired locks.
                        return false;
                    }
                }
            }
        }
        return true;
    }
}

說明:app

  • TC 判斷branchRegister操做可否獲取鎖按照維度層層遞進判斷。
  • 一、先從ResourceId的維度進行判斷, LOCK_MAP.get(resourceId)。
  • 二、再從tableName的維度進行判斷, dbLockMap.get(tableName)。
  • 三、再從pk的hashcode的維度進行判斷,tableLockMap.get(bucketId)。
  • 四、在從pk的維度進行判斷,bucketLockMap.get(pk)。
  • 五、若是按照上述維度判斷後未存在加鎖的branchSession就返回可以加鎖,不然返回不能加鎖。
  • 六、判斷加鎖過程當中處理了冪等,若是本身已經加鎖了能夠再次獲取鎖。

 

TM 全局鎖維護數據結構

private static final ConcurrentHashMap<String, 
                     ConcurrentHashMap<String, 
                     ConcurrentHashMap<Integer, 
                     Map<String, Long>>>> LOCK_MAP = 
                     new ConcurrentHashMap<String, 
                         ConcurrentHashMap<String, 
                        ConcurrentHashMap<Integer, 
                        Map<String, Long>>>>();

說明:ui

  • LOCK_MAP 做爲TC全局鎖的保存結構。
  • 第一層ConcurrentHashMap的key爲resourceId。
  • 第二層ConcurrentHashMap的key爲tableName。
  • 第三層ConcurrentHashMap的key爲pk的hashCode。
  • 第四層的Map的key爲pk,value爲transactionId(RM攜帶過來)

 

- Github:https://github.com/alibaba/fescarthis

- 官方中文介紹:https://github.com/alibaba/fescar/wikispa

相關文章
相關標籤/搜索