分佈式鎖優化方案

先來段提神醒腦的問題場景描述:java

在業務的某一環節,員工獲取某張表的數據進行處理,要求不一樣的員工獲取不一樣(id)的數據。(好比張三獲取了id=1的這條數據,其它員工就不能獲取該數據,轉而獲取其它)redis

STOP!!!數據庫

讀者能夠先思考下,若是是你,會怎麼處理這個問題。以後,再和筆者的優化方案做比較。相信我,這樣更有收穫。多線程

 

 

 

 

 

 

 

 

##############             我是給讀者思考空間的帥氣的分割線         ################################app

 

由於是集羣環境,需經過分佈式鎖(基於redis)進行處理。分佈式

 

原代碼的邏輯以下:
獲取鎖setnx(lock_key,overtime),成功後獲取list,而後get(0)。finally中釋放鎖。性能

 

原邏輯中,先獲取鎖,成功後再拿列表數據,取第1條。比較容易能想到,若是對列表list中的每一個對象單獨加鎖,多個線程間會有更少的資源競爭,性能也所以提高。
因而構思出方案一:單獨對象鎖方案優化

步驟1:
鏈接redis,若是此處拋異常,進行重試操做3次;重試3次依然不成,中斷。
鏈接成功,從redis存儲的指定set集合(payment_handler_set)上獲取數據。spa

步驟2:
若是步驟1的payment_handler_set爲空,或者payment_handler_set不存在,則從數據庫獲取符合條件的數據。線程

    若是無符合條件數據,return "NoData";
    不然,將這些數據的id組成set,放入redis。
    【注:set元素最大能夠包含(2的32次方-1)個元素,目測不會有溢出問題;但考慮到步驟3的亂序操做,這裏從數據庫獲取符合條件的前300條數據】
       
步驟3:
前兩部結束後,或得到java代碼中的paymentSet,做爲數據副本。對paymentSet做亂序操做。

【注:加上「亂序操做」,不一樣的線程獲取的list中元素的對象次序隨機,減小資源競爭】

步驟4:
Payment p = null;
循環paymentSet,依次獲取對象鎖,setnx(對象id,超時時間)。
A.返回1,獲取成功;
    首先檢查該對象在數據庫中的狀態,是否還符合條件;
    【注:這裏的檢查是有必要的。
    線程t1在步驟2從數據庫中獲取了java版副本paymentSet1,同時t2獲取了paymentSet2。而後paymentSet1中的id1處理完數據,鎖已釋放;paymentSet2就不該該再處理id1數據了】
        
    若是符合條件,根據id獲取payment給p賦值,break。
    不然,表示該對象已處理完,從redis中的payment_handler_set中移除當前對象,釋放該對象鎖,continue;

B.返回0,獲取失敗,表示該對象正在被其它線程處理,continue。

循環結束,判斷p是否爲null:
    若是是,表示paymentSet中無可用對象,return "NoData";
    不然,進行業務處理,finally中釋放鎖。

 

方案 一 over!!

其實,方案一的核心思路,就像前文說的,對list中的每一個對象加鎖。

 

 

那是一個陽光明媚的週六,本覺得搞定方案一後,能夠宣告收工,去吃個火鍋唱個歌了……可是!!!

筆者在檢查方案一,查閱redis相關資料的時候想到:彷佛還有更好的方案。

方案二:進階的操做鎖(推薦)

payment_handler_dataset:存放待處理的數據
payment_handler_operset:存放正在處理的數據

步驟1:
鏈接redis,若是此處拋異常,進行重試操做3次;重試3次依然不成,中斷。
鏈接成功,從redis存儲的指定set集合(payment_handler_dataset)上獲取數據。

步驟2:
若是步驟1的payment_handler_dataset爲空,或者payment_handler_dataset不存在,則從數據庫獲取符合條件,而且不在payment_handler_operset中的數據。

    若是無符合條件數據,return "NoData";
    不然,以setnx(update_handler_set,超時時間)方式,獲取更新數據操做鎖:
        返回1,獲取成功:將這些數據的id組成set,放入redis中的payment_handler_dataset(考慮到,可能有線程卡死的狀況,數據以watch方式更新)。finally中釋放鎖。

【注:set元素最大能夠包含(2的32次方-1)個元素,目測不會有溢出問題。但數據過多的話,這一步能夠限制set的大小,好比:只取前500條數據(具體的限制到多少,根據實際狀況調整)】
        返回0,獲取失敗,表示redis中的該集合數據,正由其它線程更新。能夠sleep(1000),return 步驟1。

步驟3:

payment_handler_dataset以SPOP命令(隨機移除並返回一個元素)獲取元素String randId

       若是randId爲nil,表示集合中已經無元素,return 步驟1。

       不然,先將randId放入payment_handler_operset,表示該數據正在被操做。
             而後根據randId獲取payment,進行相應的業務處理。
             finally中將randId移出payment_handler_operset,表示該數據操做完成。
       【注:這裏能夠用多線程來寫,設置超時時間,做線程中斷】

 

方案有了,只差代碼。筆者懶,讀者先自行腦補吧(或許我之後會補上具體代碼實現(⊙﹏⊙)b)……

#########################################

現把當年承諾的代碼補上:

/**
 * getPayment:(獲取數據). <br/>
 *
 * @author liuzijian
 * @since JDK 1.8
 */
public Payment getPayment(Long tenantId,Long userId,Integer status,boolean isQingDan){
    final String dataSetKey = getByTenant(wait_oper, tenantId,status, isQingDan);   //待操做
    final String operingSetKey = getByTenant(doing_oper, tenantId,status, isQingDan);   //操做中
    Payment res = null;
    try {
	long id = redisClient.spop(dataSetKey);
	if(id==0){  
	    /** 無數據 **/
	    logger.info("redis中「{}」中已無數據,嘗試從數據庫中獲取",dataSetKey);
	    
	    /** 查詢數據庫中待處理數據,最多獲取500條 **/    //TODO 這裏能夠優化成單獨線程寫,其它線程等待
	    List<Long> idList = getMapper().getWaitOperData(tenantId, status, isQingDan);
	    /** 過濾掉在redis已操做集合中的 **/
	    Iterator<Long> itea = idList.iterator();
	    while(itea.hasNext()){
		Long tempId = itea.next();
		if(redisClient.sIsMember(operingSetKey, tempId)){
		    logger.info("過濾掉redis已操做集合中的數據:id={}",tempId);
		    itea.remove();
		}
	    }
	    
	    if(CollectionUtils.isEmpty(idList)){   //數據庫中無符合條件的數據
		logger.info("數據庫中一樣無status={}的數據,返回null",status);
		return res;
	    }else{
		try {
		    final String updateLock = getByTenant(update, tenantId, status, isQingDan);
		    if(distributedLock.tryLock(updateLock, 3000)){
			logger.info("成功獲取了分分佈式鎖lock={},對redis數據set={}進行更新,更新的內容ids={}:",updateLock,dataSetKey,idList);
			redisClient.sAdd(dataSetKey, idList.toArray());
		    }else{
			TimeUnit.MILLISECONDS.sleep(500L);
		    }
		    
		    return getPayment(tenantId,userId,status,isQingDan);  //數據更新後,再次調用本方法,從新獲取
		} catch (DistributedLockException e) {
		    logger.error("獲取分佈式鎖error:"+Utils.getFullErrorMessage(e));
		} catch (InterruptedException e) {
		    logger.error("線程沉睡error:"+Utils.getFullErrorMessage(e));
		}
	    }
	    
	}else{
	    /** 有數據 **/
	    res = findOne(id);
	    logger.info("獲取了id={}的數據",id);
				
	    redisClient.sAdd(operingSetKey, id);    //記錄正在操做的payment
	    
	    /** 業務邏輯部分,記錄操做人員等 start **/
	    /** 狀態修改,操做人員記錄 **/
	    if (PaymentStatus.INPUT_WAIT.getValue().equals(status)){
		res = checkDataId(res);
		res.setEntryBy(userId);
		res.setStatus(PaymentStatus.INPUT_ING.getValue());
	    } else if (PaymentStatus.CHECK_WAIT.getValue().equals(status)) {
		res.setVerifyBy(userId);
		res.setStatus(PaymentStatus.CHECK_ING.getValue());
	    }
	    res.setUserId(userId);
	    updateSelective(res);
	    /** 業務邏輯部分,記錄操做人員等 end **/

	    redisClient.sRem(operingSetKey, id);    //數據庫記錄id後,redis中可清掉id
	    
	}
    } catch (CacheException e) {
	logger.error("redis隨機彈出元素時error:"+Utils.getFullErrorMessage(e));
    }
    return res;
}
相關文章
相關標籤/搜索