先來段提神醒腦的問題場景描述:java
在業務的某一環節,員工獲取某張表的數據進行處理,要求不一樣的員工獲取不一樣(id)的數據。(好比張三獲取了id=1的這條數據,其它員工就不能獲取該數據,轉而獲取其它)redis
STOP!!!數據庫
讀者能夠先思考下,若是是你,會怎麼處理這個問題。以後,再和筆者的優化方案做比較。相信我,這樣更有收穫。多線程
############## 我是給讀者思考空間的帥氣的分割線 ################################app
由於是集羣環境,需經過分佈式鎖(基於redis)進行處理。分佈式
原代碼的邏輯以下:
獲取鎖setnx(lock_key,overtime),成功後獲取list,而後get(0)。finally中釋放鎖。性能
原邏輯中,先獲取鎖,成功後再拿列表數據,取第1條。比較容易能想到,若是對列表list中的每一個對象單獨加鎖,多個線程間會有更少的資源競爭,性能也所以提高。
因而構思出方案一:單獨對象鎖方案優化
步驟1:
鏈接redis,若是此處拋異常,進行重試操做3次;重試3次依然不成,中斷。
鏈接成功,從redis存儲的指定set集合(payment_handler_set)上獲取數據。spa
步驟2:
若是步驟1的payment_handler_set爲空,或者payment_handler_set不存在,則從數據庫獲取符合條件的數據。線程
若是無符合條件數據,return "NoData";
不然,將這些數據的id組成set,放入redis。
【注:set元素最大能夠包含(2的32次方-1)個元素,目測不會有溢出問題;但考慮到步驟3的亂序操做,這裏從數據庫獲取符合條件的前300條數據】
步驟3:
前兩部結束後,或得到java代碼中的paymentSet,做爲數據副本。對paymentSet做亂序操做。
【注:加上「亂序操做」,不一樣的線程獲取的list中元素的對象次序隨機,減小資源競爭】
步驟4:
Payment p = null;
循環paymentSet,依次獲取對象鎖,setnx(對象id,超時時間)。
A.返回1,獲取成功;
首先檢查該對象在數據庫中的狀態,是否還符合條件;
【注:這裏的檢查是有必要的。
線程t1在步驟2從數據庫中獲取了java版副本paymentSet1,同時t2獲取了paymentSet2。而後paymentSet1中的id1處理完數據,鎖已釋放;paymentSet2就不該該再處理id1數據了】
若是符合條件,根據id獲取payment給p賦值,break。
不然,表示該對象已處理完,從redis中的payment_handler_set中移除當前對象,釋放該對象鎖,continue;
B.返回0,獲取失敗,表示該對象正在被其它線程處理,continue。
循環結束,判斷p是否爲null:
若是是,表示paymentSet中無可用對象,return "NoData";
不然,進行業務處理,finally中釋放鎖。
方案 一 over!!
其實,方案一的核心思路,就像前文說的,對list中的每一個對象加鎖。
那是一個陽光明媚的週六,本覺得搞定方案一後,能夠宣告收工,去吃個火鍋唱個歌了……可是!!!
筆者在檢查方案一,查閱redis相關資料的時候想到:彷佛還有更好的方案。
方案二:進階的操做鎖(推薦)
payment_handler_dataset:存放待處理的數據
payment_handler_operset:存放正在處理的數據
步驟1:
鏈接redis,若是此處拋異常,進行重試操做3次;重試3次依然不成,中斷。
鏈接成功,從redis存儲的指定set集合(payment_handler_dataset)上獲取數據。
步驟2:
若是步驟1的payment_handler_dataset爲空,或者payment_handler_dataset不存在,則從數據庫獲取符合條件,而且不在payment_handler_operset中的數據。
若是無符合條件數據,return "NoData";
不然,以setnx(update_handler_set,超時時間)方式,獲取更新數據操做鎖:
返回1,獲取成功:將這些數據的id組成set,放入redis中的payment_handler_dataset(考慮到,可能有線程卡死的狀況,數據以watch方式更新)。finally中釋放鎖。
【注:set元素最大能夠包含(2的32次方-1)個元素,目測不會有溢出問題。但數據過多的話,這一步能夠限制set的大小,好比:只取前500條數據(具體的限制到多少,根據實際狀況調整)】
返回0,獲取失敗,表示redis中的該集合數據,正由其它線程更新。能夠sleep(1000),return 步驟1。
步驟3:
payment_handler_dataset以SPOP命令(隨機移除並返回一個元素)獲取元素String randId
若是randId爲nil,表示集合中已經無元素,return 步驟1。
不然,先將randId放入payment_handler_operset,表示該數據正在被操做。
而後根據randId獲取payment,進行相應的業務處理。
finally中將randId移出payment_handler_operset,表示該數據操做完成。
【注:這裏能夠用多線程來寫,設置超時時間,做線程中斷】
方案有了,只差代碼。筆者懶,讀者先自行腦補吧(或許我之後會補上具體代碼實現(⊙﹏⊙)b)……
#########################################
現把當年承諾的代碼補上:
/** * getPayment:(獲取數據). <br/> * * @author liuzijian * @since JDK 1.8 */ public Payment getPayment(Long tenantId,Long userId,Integer status,boolean isQingDan){ final String dataSetKey = getByTenant(wait_oper, tenantId,status, isQingDan); //待操做 final String operingSetKey = getByTenant(doing_oper, tenantId,status, isQingDan); //操做中 Payment res = null; try { long id = redisClient.spop(dataSetKey); if(id==0){ /** 無數據 **/ logger.info("redis中「{}」中已無數據,嘗試從數據庫中獲取",dataSetKey); /** 查詢數據庫中待處理數據,最多獲取500條 **/ //TODO 這裏能夠優化成單獨線程寫,其它線程等待 List<Long> idList = getMapper().getWaitOperData(tenantId, status, isQingDan); /** 過濾掉在redis已操做集合中的 **/ Iterator<Long> itea = idList.iterator(); while(itea.hasNext()){ Long tempId = itea.next(); if(redisClient.sIsMember(operingSetKey, tempId)){ logger.info("過濾掉redis已操做集合中的數據:id={}",tempId); itea.remove(); } } if(CollectionUtils.isEmpty(idList)){ //數據庫中無符合條件的數據 logger.info("數據庫中一樣無status={}的數據,返回null",status); return res; }else{ try { final String updateLock = getByTenant(update, tenantId, status, isQingDan); if(distributedLock.tryLock(updateLock, 3000)){ logger.info("成功獲取了分分佈式鎖lock={},對redis數據set={}進行更新,更新的內容ids={}:",updateLock,dataSetKey,idList); redisClient.sAdd(dataSetKey, idList.toArray()); }else{ TimeUnit.MILLISECONDS.sleep(500L); } return getPayment(tenantId,userId,status,isQingDan); //數據更新後,再次調用本方法,從新獲取 } catch (DistributedLockException e) { logger.error("獲取分佈式鎖error:"+Utils.getFullErrorMessage(e)); } catch (InterruptedException e) { logger.error("線程沉睡error:"+Utils.getFullErrorMessage(e)); } } }else{ /** 有數據 **/ res = findOne(id); logger.info("獲取了id={}的數據",id); redisClient.sAdd(operingSetKey, id); //記錄正在操做的payment /** 業務邏輯部分,記錄操做人員等 start **/ /** 狀態修改,操做人員記錄 **/ if (PaymentStatus.INPUT_WAIT.getValue().equals(status)){ res = checkDataId(res); res.setEntryBy(userId); res.setStatus(PaymentStatus.INPUT_ING.getValue()); } else if (PaymentStatus.CHECK_WAIT.getValue().equals(status)) { res.setVerifyBy(userId); res.setStatus(PaymentStatus.CHECK_ING.getValue()); } res.setUserId(userId); updateSelective(res); /** 業務邏輯部分,記錄操做人員等 end **/ redisClient.sRem(operingSetKey, id); //數據庫記錄id後,redis中可清掉id } } catch (CacheException e) { logger.error("redis隨機彈出元素時error:"+Utils.getFullErrorMessage(e)); } return res; }