分佈式鎖,是指在分佈式的集羣環境中,保證不一樣節點的線程同步執行。redis
分佈式鎖的實現有哪些?算法
1.Memcached分佈式鎖spring
利用Memcached的add命令。此命令是原子性操做,只有在key不存在的狀況下,才能add成功,也就意味着線程獲得了鎖。apache
2.Redis分佈式鎖app
和Memcached的方式相似,利用Redis的setnx命令。此命令一樣是原子性操做,只有在key不存在的狀況下,才能set成功。(setnx命令並不完善,後續會介紹替代方案)分佈式
3.Zookeeper分佈式鎖ide
利用Zookeeper的順序臨時節點,來實現分佈式鎖和等待隊列。Zookeeper設計的初衷,就是爲了實現分佈式鎖服務的。函數
4.Chubbyspring-boot
Google公司實現的粗粒度分佈式鎖服務,底層利用了Paxos一致性算法。學習
這麼多種實現方法,選擇比較有表明性的Redis的分佈式鎖來學習:
如何用Redis實現分佈式鎖?
Redis分佈式鎖的基本流程並不難理解,但要想寫得盡善盡美,也並非那麼容易。在這裏,咱們須要先了解分佈式鎖實現的三個核心要素:
1.加鎖
最簡單的方法是使用setnx命令。key是鎖的惟一標識,按業務來決定命名。好比想要給一種商品的秒殺活動加鎖,能夠給key命名爲 「lock_sale_商品ID」 。而value設置成什麼呢?咱們能夠姑且設置成1。加鎖的僞代碼以下:
setnx(key,1)
當一個線程執行setnx返回1,說明key本來不存在,該線程成功獲得了鎖;當一個線程執行setnx返回0,說明key已經存在,該線程搶鎖失敗。
2.解鎖
有加鎖就得有解鎖。當獲得鎖的線程執行完任務,須要釋放鎖,以便其餘線程能夠進入。釋放鎖的最簡單方式是執行del指令,僞代碼以下:
del(key)
釋放鎖以後,其餘線程就能夠繼續執行setnx命令來得到鎖。
3.鎖超時
鎖超時是什麼意思呢?若是一個獲得鎖的線程在執行任務的過程當中掛掉,來不及顯式地釋放鎖,這塊資源將會永遠被鎖住,別的線程再也別想進來。
因此,setnx的key必須設置一個超時時間,以保證即便沒有被顯式釋放,這把鎖也要在必定時間後自動釋放。setnx不支持超時參數,因此須要額外的指令,僞代碼以下:
expire(key, 30)
模擬此場景,寫一個搶購秒殺的demo:
Controller:
@RestController @RequestMapping("/skill") @Slf4j public class SecKillController { @Autowired private SecKillService secKillService;
/** * 秒殺,沒有搶到得到"哎呦喂,xxxxx",搶到了會返回剩餘的庫存量 * @param productId * @return * @throws Exception */ @GetMapping("/order/{productId}") public String skill(@PathVariable String productId)throws Exception { log.info("@skill request, productId:" + productId); secKillService.orderProductMockDiffUser(productId); return secKillService.querySecKillProductInfo(productId); } }
業務層Impl:(未作任何同步處理)
@Service public class SecKillServiceImpl implements SecKillService { private static final int TIMEOUT = 10 * 1000; //超時時間 10s /** * 國慶活動,皮蛋粥特價,限量100000份 */ static Map<String,Integer> products; static Map<String,Integer> stock; static Map<String,String> orders; static { /** * 模擬多個表,商品信息表,庫存表,秒殺成功訂單表 */ products = new HashMap<>(); stock = new HashMap<>(); orders = new HashMap<>(); products.put("123456", 100000); stock.put("123456", 100000); } private String queryMap(String productId) { return "國慶活動,皮蛋粥特價,限量份" + products.get(productId) +" 還剩:" + stock.get(productId)+" 份" +" 該商品成功下單用戶數目:" + orders.size() +" 人" ; } @Override public String querySecKillProductInfo(String productId) { return this.queryMap(productId); } @Override public void orderProductMockDiffUser(String productId) { Long time = System.currentTimeMillis() + TIMEOUT; //1.查詢該商品庫存,爲0則活動結束。 int stockNum = stock.get(productId); if(stockNum == 0) { throw new SellException(100,"活動結束"); }else { //2.下單(模擬不一樣用戶openid不一樣) orders.put(KeyUtil.getUniqueKey(),productId); //3.減庫存 stockNum =stockNum-1; try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } stock.put(productId,stockNum); } } }
啓動項目,而後使用apache bench 壓測:ab -n 100 -c 100 http://localhost:8080/skill/order/123456
發現數據同步失敗:
接下來嘗試在函數加上 synchronized,同步沒問題,可是響應時間較長
使用Redis分佈式鎖:(須要引入 spring-boot-starter-data-redis 相關依賴)
RedisLock類:
@Component @Slf4j public class RedisLock { @Autowired private StringRedisTemplate redisTemplate; /** * 加鎖 * @param key * @param value * @return */ public boolean lock(String key, String value){ // 設置redis值,若是值已存在不作操做,跳到下一步 if (redisTemplate.opsForValue().setIfAbsent(key, value)) { return true; } // 獲取reids中的時間戳 String currentValue = redisTemplate.opsForValue().get(key); if (!StringUtils.isEmpty(currentValue) && Long.parseLong(currentValue) < System.currentTimeMillis()) { // 拿到上一次的時間戳,並設置新的時間戳,保證只有一個線程能同步 String oldValue = redisTemplate.opsForValue().getAndSet(key, value); // 若第二個線程進來,此時oldvalue已經不等於currentValue了 if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) { return true; } } return false; } /** * 解鎖 * @param key * @param value */ public void unLock(String key, String value) { try { String currentValue = redisTemplate.opsForValue().get(key); if (!StringUtils.isEmpty(currentValue) && value.equals(currentValue)) { redisTemplate.opsForValue().getOperations().delete(key); } } catch (Exception e) { log.error("【redis分佈式鎖】解鎖異常, {}", e); } } }
業務層Impl:(加上Redis鎖的處理)
@Service public class SecKillServiceImpl implements SecKillService { private static final int TIMEOUT = 10 * 1000; //超時時間 10s @Autowired private RedisLock redisLock; /** * 國慶活動,皮蛋粥特價,限量100000份 */ static Map<String,Integer> products; static Map<String,Integer> stock; static Map<String,String> orders; static { /** * 模擬多個表,商品信息表,庫存表,秒殺成功訂單表 */ products = new HashMap<>(); stock = new HashMap<>(); orders = new HashMap<>(); products.put("123456", 100000); stock.put("123456", 100000); } private String queryMap(String productId) { return "國慶活動,皮蛋粥特價,限量份" + products.get(productId) +" 還剩:" + stock.get(productId)+" 份" +" 該商品成功下單用戶數目:" + orders.size() +" 人" ; } @Override public String querySecKillProductInfo(String productId) { return this.queryMap(productId); } @Override public void orderProductMockDiffUser(String productId) { Long time = System.currentTimeMillis() + TIMEOUT; //加鎖 if (!redisLock.lock(productId, String.valueOf(time))) { throw new SellException(101, "人太多了歇一會吧!"); } //1.查詢該商品庫存,爲0則活動結束。 int stockNum = stock.get(productId); if(stockNum == 0) { throw new SellException(100,"活動結束"); }else { //2.下單(模擬不一樣用戶openid不一樣) orders.put(KeyUtil.getUniqueKey(),productId); //3.減庫存 stockNum =stockNum-1; try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } stock.put(productId,stockNum); } //解鎖 redisLock.unLock(productId, String.valueOf(time)); } }
重啓,再次用apache bench壓測 ab -n 100 -c 100 http://localhost:8080/skill/order/123456
結果,響應時間很是快,減小了卡頓,同步也正常!