微信公衆號:內核小王子
關注可瞭解更多關於數據庫,JVM內核相關的知識;
若是你有任何疑問也能夠加我pigpdong[^1]java
在多線程狀況下訪問資源,咱們須要加鎖來保證業務的正常進行,JDK中提供了不少併發控制相關的工具包,來保證多線程下能夠高效工做,一樣在分佈式環境下,有些互斥操做咱們能夠藉助分佈式鎖來實現兩個操做不能同時運行,必須等到另一個任務結束了把鎖釋放了才能獲取鎖而後執行,由於跨JVM咱們須要一個第三方系統來協助實現分佈式鎖,通常咱們能夠用
數據庫,redis,zookeeper,etcd等來實現.node
要實現一把分佈式鎖,咱們須要先分析下這把鎖有哪些特性mysql
1.在分佈式集羣中,也就是不一樣的JVM中,相互有衝突的方法,能夠是不一樣JVM相同實例內的同一個方法,也能夠是不一樣方法,也就是不一樣業務間的隔離和同一個業務操做不能並行運行,而分佈式鎖須要保證這兩個方法在同一時間只能有一個運行.redis
2.這把鎖最好是可重入的,由於不可重入的鎖很容易出現死鎖sql
3.獲取鎖和釋放鎖的性能要很高數據庫
4.支持獲取鎖的時候能夠阻塞等待,以及等待時間微信
5.獲取鎖後支持設置一個期限,超過這個期限能夠自動釋放,防止程序沒有本身釋放的狀況多線程
6.這是一把輕量鎖,對業務侵入小併發
7.易用dom
因爲數據庫的鎖無能是在性能高可用上都不及其餘方式,這裏咱們簡單介紹下可能的方案
SET resource_name my_random_value NX PX 30000
咱們能夠看看下面獲取分佈式鎖的使用場景,假設咱們釋放鎖,直接del這個key
if (!redisComponent.acquireLock(lockKey) { LOGGER.warn(">>分佈式併發鎖獲取失敗"); return ; } try { // do business ... } catch (BusinessException e) { // exception handler ... } finally { redisComponent.releaseLock(lockKey); }
爲了解決以上問題,咱們能夠在釋放鎖的時候,判斷下鎖是否存在,這樣進程A在釋放鎖的時候就不會將進程B加的鎖釋放了,
或者經過如下方式,將過時時間作爲value存儲在對應的key中,釋放鎖的時候,判斷當前時間是否小於過時時間,只有小於當前時間才處理,咱們也能夠在進行del操做的時候判斷下對應的value是否相等,這個時候就須要在del操做的時候傳人
my_random_value
下面咱們看下redis實現分佈式鎖java代碼實現,咱們採用在del的時候判斷下當前時間是否小於過時時間
public boolean acquireLock(String lockKey, long expired) { ShardedJedis jedis = null; try { jedis = pool.getResource(); String value = String.valueOf(System.currentTimeMillis() + expired + 1); int tryTimes = 0; while (tryTimes++ < 3) { /* * 1. 嘗試鎖 * setnx : set if not exist */ if (jedis.setnx(lockKey, value).equals(1L)) { return true; } /* * 2. 已經被別的線程鎖住,判斷是否失效 */ String oldValue = jedis.get(lockKey); if (StringUtils.isBlank(oldValue)) { /* * 2.1 value存的是超時時間,若是爲空有2種狀況 * 1. 異常數據,沒有value 或者 value爲空字符 * 2. 鎖剛好被別的線程釋放了 * 此時須要嘗試從新嘗試,爲了不出現狀況1時致使死循環,只重試3次 */ continue; } Long oldValueL = Long.valueOf(oldValue); if (oldValueL < System.currentTimeMillis()) { /* * 已超時,從新嘗試鎖 * * Redis:getSet 操做步驟: * 1.獲取 Key 對應的 Value 做爲返回值,不存在時返回null * 2.設置 Key 對應的 Value 爲傳入的值 * 這裏若是返回的 getValue != oldValue 表示已經被其它線程從新修改了 */ String getValue = jedis.getSet(lockKey, value); return oldValue.equals(getValue); } else { // 未超時,則直接返回失敗 return false; } } return false; } catch (Throwable e) { logger.error("acquireLock error", e); return false; } finally { returnResource(jedis); } } /** * 釋放鎖 * * @param lockKey * key */ public void releaseLock(String lockKey) { ShardedJedis jedis = null; try { jedis = pool.getResource(); long current = System.currentTimeMillis(); // 避免刪除非本身獲取到的鎖 String value = jedis.get(lockKey); if (StringUtils.isNotBlank(value) && current < Long.valueOf(value)) { jedis.del(lockKey); } } catch (Throwable e) { logger.error("releaseLock error", e); } finally { returnResource(jedis); } }
這種方式沒有用到剛剛說的my_random_value,咱們看下若是咱們按如下代碼獲取鎖會有什麼問題
if (!redisComponent.acquireLock(lockKey) { LOGGER.warn(">>分佈式併發鎖獲取失敗"); return ; } try { boolean locked = redisComponent.acquireLock(lockKey); if(locked) // do business ... } catch (BusinessException e) { // exception handler ... } finally { redisComponent.releaseLock(lockKey); }
一樣這種方式當進程A沒有獲取到鎖,以後進程B獲取到鎖,進程A會釋放進程B的鎖,這個時候咱們能夠藉助my_random_value來實現
/** * 釋放鎖 * * @param lockKey ,value */ public void releaseLock(String lockKey, long oldvalue) { ShardedJedis jedis = null; try { jedis = pool.getResource(); String value = jedis.get(lockKey); if (StringUtils.isNotBlank(value) && oldvalue == Long.valueOf(value)) { jedis.del(lockKey); } } catch (Throwable e) { logger.error("releaseLock error", e); } finally { returnResource(jedis); } }
這種方式須要保存以前獲取鎖時候的value值,並在釋放鎖的帶上value值,不過這種實現方式,value的值爲過時時間也不惟一
因爲咱們用了redis得超時機制來釋放鎖,那麼當進程在鎖租約到期後尚未執行結束,那麼其餘進程獲取到鎖後則會產生併發寫的狀況,這種若是業務上須要精確控制,只能用樂觀鎖來控制了,每次寫入數據都帶一個鎖的版本,若是下次獲取鎖的時候版本加1,這樣上面那種狀況,鎖到期釋放了新的進程獲取到鎖後會使用新的版本號,以前的進程鎖已經釋放了若是繼續使用該鎖則會發現版本已經不對了
能夠藉助zookeeper的順序節點,在一個父節點下,全部須要爭搶鎖的資源都去這個目錄下建立一個順序節點,而後判斷這個臨時順序節點是不是兄弟節點中順序最小的,若是是最小的則獲取到鎖,若是不是則監聽這個順序最小的節點的刪除事件,而後在繼續根據這個流程獲取最小節點
public void lock() { try { // 建立臨時子節點 String myNode = zk.create(root + "/" + lockName , data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(j.join(Thread.currentThread().getName() + myNode, "created")); // 取出全部子節點 List<String> subNodes = zk.getChildren(root, false); TreeSet<String> sortedNodes = new TreeSet<>(); for(String node :subNodes) { sortedNodes.add(root +"/" +node); } String smallNode = sortedNodes.first(); String preNode = sortedNodes.lower(myNode); if (myNode.equals( smallNode)) { // 若是是最小的節點,則表示取得鎖 System.out.println(j.join(Thread.currentThread().getName(), myNode, "get lock")); this.nodeId.set(myNode); return; } CountDownLatch latch = new CountDownLatch(1); Stat stat = zk.exists(preNode, new LockWatcher(latch));// 同時註冊監聽。 // 判斷比本身小一個數的節點是否存在,若是不存在則無需等待鎖,同時註冊監聽 if (stat != null) { System.out.println(j.join(Thread.currentThread().getName(), myNode, " waiting for " + root + "/" + preNode + " released lock")); latch.await();// 等待,這裏應該一直等待其餘線程釋放鎖 nodeId.set(myNode); latch = null; } } catch (Exception e) { throw new RuntimeException(e); } } public void unlock() { try { System.out.println(j.join(Thread.currentThread().getName(), nodeId.get(), "unlock ")); if (null != nodeId) { zk.delete(nodeId.get(), -1); } nodeId.remove(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } }
固然若是咱們開發環境使用的是etcs也能夠用etcd來實現分佈式鎖,原理和zookeeper相似