目前幾乎全部的大型網站及應用都是採用分佈式部署的方式,分佈式系統開發帶來的優勢不少,高可用,高併發,水平擴展,分開部署等。但分佈式的開發也帶來了一些新問題,有的時候,咱們須要保證一個方法在同一時間內只能被同一個線程執行。在單機環境中,Java中其實提供了不少併發處理相關的API ,也就是咱們常說的「鎖」(如synchronized,lock),可是這些API在分佈式場景中就無能爲力了,也就是說Java沒有提供分佈式鎖的功能。html
基於分佈式鎖的實現有多種方案,常見的有基於數據庫自己的鎖來實現,或者基於zookeeper的API實現,或者是基於緩存來實現分佈式鎖等等,這些方案都各有可取之處,今天咱們介紹的是基於redis的緩存實現分佈式鎖的方案,你們若是對其餘方案有興趣的能夠上網搜索研究。git
redis是基於key-value的一種NoSql數據庫,普遍應用於分佈式的應用中,通常用於放置緩存數據。安裝的方法也比較簡單,樓主安裝的是windows版本的,選擇最新的zip版,下載完以後直接解壓便可。下載地址:https://github.com/MicrosoftArchive/redis/tagsgithub
redis中有一個命令setnx (SET IF NOT EXISTS) , 若是不存在,就設置key,將 key
的值設爲 value
,當且僅當 key 不存在。若給定的 key
已經存在,則 SETNX 不作任何動做。基於這個特性,咱們能夠對須要鎖住的對象加上key,這樣,同一時間就只能有一個線程擁有這把鎖,從而達到分佈式鎖的效果。下面用一個具體的Java實例來展現redis的分佈式鎖效果。redis
Java操做redis須要用到第三方的庫類,因此先在pom.xml中引入依賴。數據庫
加入依賴後,作一個redis的工具方法,分別實現的是加鎖和解鎖的功能。windows
public class RedisLock { @Autowired private StringRedisTemplate redisTemplate; /** * 加鎖 * * @param key * @param value 當前時間+超時時間 * @return */ public boolean lock(String key, String value) { //至關於setnx命令 if (redisTemplate.opsForValue().setIfAbsent(key, value)) { return true; } //下面的這段代碼是判斷以前加的鎖是否超時,是的話就更新,必定要加這段代碼 //否則就有可能出現死鎖。 String currentValue = redisTemplate.opsForValue().get(key); //若是鎖過時 if (!StringUtils.isEmpty(currentValue) && Long.parseLong(currentValue) < System.currentTimeMillis()) { //獲取上一個鎖的時間,這段代碼的判斷是防止多線程進入這裏,只會有一個線程拿到鎖 String oldValue = redisTemplate.opsForValue().getAndSet(key, value); if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) { return true; } } return false; } /** * 解鎖 * * @param key * @param value */ public void unLock(String key, String value) { try { String currentValue = redisTemplate.opsForValue().get(key); if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) { redisTemplate.opsForValue().getOperations().delete(key); } } catch (Exception e) { log.error("【redis分佈式鎖】 解鎖異常,{}", e); } } }
如今,咱們模擬一個下單的場景,假設有一個秒殺的活動,同一時間有多個線程對同一個產品進行訪問,而後分別看看加鎖和沒加鎖的結果來作對比。下面是秒殺的模擬代碼:瀏覽器
public class SecKillController { @Autowired private SecKillService secKillService; /** * 查詢秒殺活動特價商品的信息 * @param productId * @return * @throws Exception */ @GetMapping("/query/{productId}") public String query(@PathVariable String productId) throws Exception{ return secKillService.querySecKillProductInfo(productId); } /** * 秒殺的方法 * @param productId * @return * @throws Exception */ @GetMapping("/order/{productId}") public String skill(@PathVariable String productId) throws Exception{ log.info("@skill request ,productId:" +productId); secKillService.orderProductKill(productId); return secKillService.querySecKillProductInfo(productId); } }
public class SecKillServiceImpl implements SecKillService { private static final int TIME_OUT = 1 * 1000; @Autowired private RedisLock redisLock; static Map<String, Integer> products; static Map<String, Integer> stock; static Map<String, String> orders; static { /** * 模擬多個表,商品信息表,庫存表,秒殺成功訂單表 */ products = new HashMap<>(); stock = new HashMap<>(); orders = new HashMap<>(); products.put("123", 100000); stock.put("123", 100000); } /** * @param productId 訂單id * @return */ private String queryMap(String productId) { return "限量份數" + products.get(productId) + "還剩:" + stock.get(productId) + "份" + "該商品成功下單用戶數目:" + orders.size() + "人"; } @Override public String querySecKillProductInfo(String productId) { return this.queryMap(productId); } @Override public void orderProductKill(String productId) { //1.查詢該商品庫存,爲0則活動結束 int stockNum = stock.get(productId); if (stockNum == 0) { throw new RuntimeException("活動結束"); } else { //2.下單(模擬不一樣用戶openid不一樣) orders.put(KeyUtil.getUniqueKey(), productId); //3.減庫存 stockNum = stockNum - 1; try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } stock.put(productId, stockNum); } } }
先模擬沒加鎖的下單狀態,咱們開啓工程後,用Apache ab做爲壓測工具來模擬高併發訪問過程緩存
在瀏覽器上訪問查詢後的訂單數量,結果顯示以下:多線程
能夠看到,再高併發的訪問環境下,若是咱們沒有對訂單作鎖的處理,那麼就可能出現數據的紊亂,致使結果不對應,這顯然不符合咱們的需求,下面咱們來看看加上redis鎖以後的訪問狀況,先把service中的秒殺代碼加上鎖。併發
@Override public void orderProductKill(String productId) { //加鎖,保證下面的代碼單線程的訪問 long time = System.currentTimeMillis() + TIME_OUT; if (!redisLock.lock(productId, String.valueOf(time))) { throw new RuntimeException( "下單失敗"); } //1.查詢該商品庫存,爲0則活動結束 int stockNum = stock.get(productId); if (stockNum == 0) { throw new RuntimeException("活動結束"); } else { //2.下單(模擬不一樣用戶openid不一樣) orders.put(KeyUtil.getUniqueKey(), productId); //3.減庫存 stockNum = stockNum - 1; try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } stock.put(productId, stockNum); } //解鎖 redisLock.unLock(productId, String.valueOf(time)); }
而後再進行一樣的操做
咱們能夠看到,加上鎖以後的訂單處理數量是正確的,也就是redis鎖是起到了做用的,這是符合咱們的需求的。
上面的例子相對比較簡單,由於精力能力有限,樓主無法給你們展現真正的分佈式鎖的實現效果,但從原理上實際上是同樣的,都是用redis的setnx命令來加上鎖,保證分佈式環境下鎖住的對象只能被一個線程訪問,並且從實現方式上來講也比較簡單 (只須要一個命令就行,很深刻人心 ) ,所以,redis在分佈式鎖的應用中也被普遍使用。
本文分享 CSDN - 鄙人薛某。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。