https://mp.weixin.qq.com/s/GydyNbaKkC301X1qjVGREghtml
咱們在構建分佈式系統的時候,常常須要控制對共享資源的互斥訪問。這個時候咱們就涉及到分佈式鎖(也稱爲全局鎖)的實現,基於目前的各類工具,咱們已經有了大量的實現方式,好比:基於Redis的實現、基於Zookeeper的實現。本文將介紹一種基於Consul 的Key/Value存儲來實現分佈式鎖以及信號量的方法。
基於Consul的分佈式鎖主要利用Key/Value存儲API中的acquire和release操做來實現。acquire和release操做是相似Check-And-Set的操做:api
acquire操做只有當鎖不存在持有者時纔會返回true,而且set設置的Value值,同時執行操做的session會持有對該Key的鎖,不然就返回false微信
具體實現中主要使用了這幾個Key/Value的API:session
public class Lock { private static final String prefix = "lock/"; // 同步鎖參數前綴 private ConsulClient consulClient; private String sessionName; private String sessionId = null; private String lockKey; /** * * @param consulClient * @param sessionName 同步鎖的session名稱 * @param lockKey 同步鎖在consul的KV存儲中的Key路徑,會自動增長prefix前綴,方便歸類查詢 */ public Lock(ConsulClient consulClient, String sessionName, String lockKey) { this.consulClient = consulClient; this.sessionName = sessionName; this.lockKey = prefix + lockKey; } /** * 獲取同步鎖 * * @param block 是否阻塞,直到獲取到鎖爲止 * @return */ public Boolean lock(boolean block) { if (sessionId != null) { throw new RuntimeException(sessionId + " - Already locked!"); } sessionId = createSession(sessionName); while(true) { PutParams putParams = new PutParams(); putParams.setAcquireSession(sessionId); if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) { return true; } else if(block) { continue; } else { return false; } } } /** * 釋放同步鎖 * * @return */ public Boolean unlock() { PutParams putParams = new PutParams(); putParams.setReleaseSession(sessionId); boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue(); consulClient.sessionDestroy(sessionId, null); return result; } /** * 建立session * @param sessionName * @return */ private String createSession(String sessionName) { NewSession newSession = new NewSession(); newSession.setName(sessionName); return consulClient.sessionCreate(newSession, null).getValue(); } }
下面單元測試的邏輯:經過線程的方式來模擬不一樣的分佈式服務來競爭鎖。多個處理線程同時以阻塞方式來申請分佈式鎖,當處理線程得到鎖以後,Sleep一段隨機事件,以模擬處理業務邏輯,處理完畢以後釋放鎖。併發
public class TestLock { private Logger logger = Logger.getLogger(getClass()); @Test public void testLock() throws Exception { new Thread(new LockRunner(1)).start(); new Thread(new LockRunner(2)).start(); new Thread(new LockRunner(3)).start(); new Thread(new LockRunner(4)).start(); new Thread(new LockRunner(5)).start(); Thread.sleep(200000L); } class LockRunner implements Runnable { private Logger logger = Logger.getLogger(getClass()); private int flag; public LockRunner(int flag) { this.flag = flag; } @Override public void run() { Lock lock = new Lock(new ConsulClient(), "lock-session", "lock-key"); try { if (lock.lock(true)) { logger.info("Thread " + flag + " start!"); Thread.sleep(new Random().nextInt(3000L)); logger.info("Thread " + flag + " end!"); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } }
單元測試執行結果以下:dom
2017-04-12 21:28:09,698 INFO [Thread-0] LockRunner - Thread 1 start! 2017-04-12 21:28:12,717 INFO [Thread-0] LockRunner - Thread 1 end! 2017-04-12 21:28:13,219 INFO [Thread-2] LockRunner - Thread 3 start! 2017-04-12 21:28:15,672 INFO [Thread-2] LockRunner - Thread 3 end! 2017-04-12 21:28:15,735 INFO [Thread-1] LockRunner - Thread 2 start! 2017-04-12 21:28:17,788 INFO [Thread-1] LockRunner - Thread 2 end! 2017-04-12 21:28:18,249 INFO [Thread-4] LockRunner - Thread 5 start! 2017-04-12 21:28:19,573 INFO [Thread-4] LockRunner - Thread 5 end! 2017-04-12 21:28:19,757 INFO [Thread-3] LockRunner - Thread 4 start! 2017-04-12 21:28:21,353 INFO [Thread-3] LockRunner - Thread 4 end!
從測試結果咱們能夠看到,經過分佈式鎖的形式來控制併發時,多個同步操做只會有一個操做可以被執行,其餘操做只有在等鎖釋放以後纔有機會去執行,因此經過這樣的分佈式鎖,咱們能夠控制共享資源同時只能被一個操做進行執行,以保障數據處理時的分佈式併發問題。分佈式
本文咱們實現了基於Consul的簡單分佈式鎖,可是在實際運行時,可能會由於各類各樣的意外狀況致使unlock操做沒有獲得正確地執行,從而使得分佈式鎖沒法釋放。因此爲了更完善的使用分佈式鎖,咱們還必須實現對鎖的超時清理等控制,保證即便出現了未正常解鎖的狀況下也能自動修復,以提高系統的健壯性。那麼如何實現呢?請持續關注個人後續分解!ide
Key/Value的API:https://www.consul.io/api/kv.html工具
版權聲明單元測試
本文采用 CC BY 3.0 CN協議 進行許可。 可自由轉載、引用,但需署名做者且註明文章出處。如轉載至微信公衆號,請在文末添加做者公衆號二維碼。
長按指紋
一鍵關注