由於疫情影響,面試成爲你們關注的一個重點,今天講解分佈式鎖的相關知識,因此找到幾個對於分佈式鎖面試的時候該怎麼回答java
爲此,將分佈式鎖相關面試知識點進行總結和講解,
從面試出發,帶你學習分佈式鎖node
福利以後,話歸正題,咱們來看一下分佈式鎖面試
你們都知道,若是咱們一臺機器上多個不一樣線程搶佔同一個資源,而且若是屢次執行會有異常,咱們稱之爲非線程安全。通常,咱們爲了解決這種問題,一般使用鎖來解決,像java語言,咱們可使用synchronized。若是是同一臺機器裏面不一樣的java實例,咱們可使用系統的文件讀寫鎖來解決,若是再擴展到不一樣的機器呢?咱們一般用分佈式鎖來解決。redis
分佈式鎖的特色以下:apache
分佈式鎖的實現有不少種,常見的有redis,zookeeper,谷歌的chubby等安全
簡單介紹一下。相信你們這裏已經想到了解決方案,那就是每次執行任務的時候,先查詢redis裏面是否已經有鎖的key,若是沒有就寫入,而後就開始執行任務。架構
這個看起來很對,不過存在什麼問題呢,例如進程A跟進程B同時查詢Redis,他們都發現Redis中沒有對應的值,而後都開始寫入,因爲不是帶版本讀寫,兩我的都寫成功了,都得到了鎖。還好,Redis給咱們提供原子寫入的操做,setnx(SET if Not eXists, 一個命令咱們最好把全稱也瞭解一下,有助於咱們記住這個命令)。分佈式
若是你覺得只要這樣就完成一個分佈式鎖,那就太天真了,咱們不妨考慮一些極端狀況,例如某個線程取到了鎖,可是很不幸,這個機器死機了,那麼這個鎖沒有被釋放,這個任務永遠就不會有人執行了。因此一種比較好的解決方案是,申請鎖的時候,預估一個程序的執行時間,而後給鎖設置一個超時時間,若是超過這個時間其餘人也能取到這個鎖。但這又引起另一個問題,有時候負載很高,任務執行得很慢,結果過了超時時間任務還沒執行完,這個時候又起了另一個任務來執行。ide
架構設計的魅力正是如此,當你解決一個問題的時候,總會引起一些新的問題,須要逐步攻破逐個解決。這種方法,咱們通常能夠在搶佔到鎖以後,就開一個守護線程,定時去redis哪裏詢問,是否是仍是由我搶佔着當前的鎖,還有多久就要過時,若是發現要過時了,就趕忙續期。函數
好了,看到這裏,相信你已經學會了如何用Redis實現一個分佈式鎖服務了
Zookeeper 實現分佈式鎖的示意圖以下:
上圖中左邊是Zookeeper集羣, lock是數據節點,node_1到node_n表示一系列的順序臨時節點,右側client_1到client_n表示要獲取鎖的客戶端。Service是互斥訪問的服務。
下面的源碼是根據Zookeeper的開源客戶端Curator實現分佈式鎖。採用zk的原生API實現會比較複雜,因此這裏就直接用Curator這個輪子,採用Curator的acquire和release兩個方法就能實現分佈式鎖。
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorDistributeLock {
public static void main(String\[\] args) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("111.231.83.101:2181",retryPolicy); client.start(); CuratorFramework client2 = CuratorFrameworkFactory.newClient("111.231.83.101:2181",retryPolicy); client2.start(); //建立分佈式鎖, 鎖空間的根節點路徑爲/curator/lock InterProcessMutex mutex = new InterProcessMutex(client,"/curator/lock"); final InterProcessMutex mutex2 = new InterProcessMutex(client2,"/curator/lock"); try { mutex.acquire(); } catch (Exception e) { e.printStackTrace(); } //得到了鎖, 進行業務流程 System.out.println("clent Enter mutex"); Thread client2Th = new Thread(new Runnable() { @Override public void run() { try { mutex2.acquire(); System.out.println("client2 Enter mutex"); mutex2.release(); System.out.println("client2 release lock"); }catch (Exception e){ e.printStackTrace(); } } }); client2Th.start(); //完成業務流程, 釋放鎖 try { Thread.sleep(5000); mutex.release(); System.out.println("client release lock"); client2Th.join(); } catch (Exception e) { e.printStackTrace(); } //關閉客戶端 client.close(); }
}
上述代碼的執行結果以下:
能夠看到client客戶端首先拿到鎖再執行業務,而後再輪到client2嘗試獲取鎖並執行業務。
一直追蹤acquire()的加鎖方法,能夠追蹤到加鎖的核心函數爲attemptLock。
String attemptLock(long time, TimeUnit unit, byte\[\] lockNodeBytes) throws Exception { ..... while ( !isDone ) { isDone = true; try { //建立臨時有序節點 ourPath = driver.createsTheLock(client, path, localLockNodeBytes); //判斷本身是否最小序號的節點,若是不是添加監聽前面節點被刪的通知 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } } //若是獲取鎖返回節點路徑 if ( hasTheLock ) { return ourPath; } .... }
深刻internalLockLoop函數源碼:
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { ....... while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { //獲取子節點列表按照序號從小到大排序 List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash //判斷本身是不是當前最小序號節點 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { //成功獲取鎖 haveTheLock = true; } else { //拿到前一個節點 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); //若是沒有拿到鎖,調用wait,等待前一個節點刪除時,經過回調notifyAll喚醒當前線程 synchronized(this) { try { //設置監聽器,getData會判讀前一個節點是否存在,不存在就會拋出異常從而不會設置監聽器 client.getData().usingWatcher(watcher).forPath(previousSequencePath); //若是設置了millisToWait,等一段時間,到了時間刪除本身跳出循環 if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } //等待一段時間 wait(millisToWait); } else { //一直等待下去 wait(); } } catch ( KeeperException.NoNodeException e ) { //getData發現前一個子節點被刪除,拋出異常 } } } } } ..... }
採用zk實現分佈式鎖在實際應用中不是很常見,須要一套zk集羣,並且頻繁監聽對zk集羣來講也是有壓力,因此不推薦你們用。不能去面試的時候,能具體說一下使用zk實現分佈式鎖,我想應該也是一個加分項 。
好了,今天就從redis和zookeeper兩個方面實現分佈式鎖,以爲有收穫的小夥伴,歡迎關注+點贊呀須要更多資料的,關注公衆號Java技術聯盟,每日更新哦