咱們在構建分佈式系統的時候,常常須要控制對共享資源的互斥訪問。這個時候咱們就涉及到分佈式鎖(也稱爲全局鎖)的實現,基於目前的各類工具,咱們已經有了大量的實現方式,好比:基於Redis的實現、基於Zookeeper的實現。本文將介紹一種基於Consul 的Key/Value存儲來實現分佈式鎖以及信號量的方法。html
分佈式鎖實現api
基於Consul的分佈式鎖主要利用Key/Value存儲API中的acquire和release操做來實現。acquire和release操做是相似Check-And-Set的操做:session
- acquire操做只有當鎖不存在持有者時纔會返回true,而且set設置的Value值,同時執行操做的session會持有對該Key的鎖,不然就返回false多線程
- release操做則是使用指定的session來釋放某個Key的鎖,若是指定的session無效,那麼會返回false,不然就會set設置Value值,並返回true併發
具體實現中主要使用了這幾個Key/Value的API:dom
- create session:https://www.consul.io/api/session.html#session_create分佈式
- delete session:https://www.consul.io/api/session.html#delete-sessionide
- KV acquire/release:https://www.consul.io/api/kv.html#create-update-key工具
基本流程單元測試
具體實現
public class Lock {
private static final String prefix = "lock/"; // 同步鎖參數前綴
private ConsulClient consulClient;
private String sessionName;
private String sessionId = null;
private String lockKey;
/**
*
* @param consulClient
* @param sessionName 同步鎖的session名稱
* @param lockKey 同步鎖在consul的KV存儲中的Key路徑,會自動增長prefix前綴,方便歸類查詢
*/
public Lock(ConsulClient consulClient, String sessionName, String lockKey) {
this.consulClient = consulClient;
this.sessionName = sessionName;
this.lockKey = prefix + lockKey;
}
/**
* 獲取同步鎖
*
* @param block 是否阻塞,直到獲取到鎖爲止
* @return
*/
public Boolean lock(boolean block) {
if (sessionId != null) {
throw new RuntimeException(sessionId + " - Already locked!");
}
sessionId = createSession(sessionName);
while(true) {
PutParams putParams = new PutParams();
putParams.setAcquireSession(sessionId);
if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) {
return true;
} else if(block) {
continue;
} else {
return false;
}
}
}
/**
* 釋放同步鎖
*
* @return
*/
public Boolean unlock() {
PutParams putParams = new PutParams();
putParams.setReleaseSession(sessionId);
boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue();
consulClient.sessionDestroy(sessionId, null);
return result;
}
/**
* 建立session
* @param sessionName
* @return
*/
private String createSession(String sessionName) {
NewSession newSession = new NewSession();
newSession.setName(sessionName);
return consulClient.sessionCreate(newSession, null).getValue();
}
}
單元測試
下面單元測試的邏輯:經過線程的方式來模擬不一樣的分佈式服務來競爭鎖。多個處理線程同時以阻塞方式來申請分佈式鎖,當處理線程得到鎖以後,Sleep一段隨機事件,以模擬處理業務邏輯,處理完畢以後釋放鎖。
public class TestLock {
private Logger logger = Logger.getLogger(getClass());
@Test
public void testLock() throws Exception {
new Thread(new LockRunner(1)).start();
new Thread(new LockRunner(2)).start();
new Thread(new LockRunner(3)).start();
new Thread(new LockRunner(4)).start();
new Thread(new LockRunner(5)).start();
Thread.sleep(200000L);
}
class LockRunner implements Runnable {
private Logger logger = Logger.getLogger(getClass());
private int flag;
public LockRunner(int flag) {
this.flag = flag;
}
@Override
public void run() {
Lock lock = new Lock(new ConsulClient(), "lock-session", "lock-key");
try {
if (lock.lock(true)) {
logger.info("Thread " + flag + " start!");
Thread.sleep(new Random().nextInt(3000L));
logger.info("Thread " + flag + " end!");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
單元測試執行結果以下:
2017-04-12 21:28:09,698 INFO [Thread-0] LockRunner - Thread 1 start!
2017-04-12 21:28:12,717 INFO [Thread-0] LockRunner - Thread 1 end!
2017-04-12 21:28:13,219 INFO [Thread-2] LockRunner - Thread 3 start!
2017-04-12 21:28:15,672 INFO [Thread-2] LockRunner - Thread 3 end!
2017-04-12 21:28:15,735 INFO [Thread-1] LockRunner - Thread 2 start!
2017-04-12 21:28:17,788 INFO [Thread-1] LockRunner - Thread 2 end!
2017-04-12 21:28:18,249 INFO [Thread-4] LockRunner - Thread 5 start!
2017-04-12 21:28:19,573 INFO [Thread-4] LockRunner - Thread 5 end!
2017-04-12 21:28:19,757 INFO [Thread-3] LockRunner - Thread 4 start!
2017-04-12 21:28:21,353 INFO [Thread-3] LockRunner - Thread 4 end!
從測試結果咱們能夠看到,經過分佈式鎖的形式來控制併發時,多個同步操做只會有一個操做可以被執行,其餘操做只有在等鎖釋放以後纔有機會去執行,因此經過這樣的分佈式鎖,咱們能夠控制共享資源同時只能被一個操做進行執行,以保障數據處理時的分佈式併發問題。
優化建議
本文咱們實現了基於Consul的簡單分佈式鎖,可是在實際運行時,可能會由於各類各樣的意外狀況致使unlock操做沒有獲得正確地執行,從而使得分佈式鎖沒法釋放。因此爲了更完善的使用分佈式鎖,咱們還必須實現對鎖的超時清理等控制,保證即便出現了未正常解鎖的狀況下也能自動修復,以提高系統的健壯性。那麼如何實現呢?請持續關注個人後續分解!
參考文檔
Key/Value的API:https://www.consul.io/api/kv.html
在上面《基於Consul的分佈式鎖實現》中咱們介紹如何基於Consul的KV存儲來實現分佈式互斥鎖。本文將繼續討論基於Consul的分佈式鎖實現。信號量是咱們在實現併發控制時會常用的手段,主要用來限制同時併發線程或進程的數量,好比:Zuul默認狀況下就使用信號量來限制每一個路由的併發數,以實現不一樣路由間的資源隔離。
信號量(Semaphore),有時被稱爲信號燈,是在多線程環境下使用的一種設施,是能夠用來保證兩個或多個關鍵代碼段不被併發調用。在進入一個關鍵代碼段以前,線程必須獲取一個信號量;一旦該關鍵代碼段完成了,那麼該線程必須釋放信號量。其它想進入該關鍵代碼段的線程必須等待直到第一個線程釋放信號量。爲了完成這個過程,須要建立一個信號量VI,而後將Acquire Semaphore VI以及Release Semaphore VI分別放置在每一個關鍵代碼段的首末端,確認這些信號量VI引用的是初始建立的信號量。如在這個停車場系統中,車位是公共資源,每輛車比如一個線程,看門人起的就是信號量的做用。
實現思路
- 信號量存儲:semaphore/key
- acquired操做:
- 建立session
- 鎖定key競爭者:semaphore/key/session
- 查詢信號量:semaphore/key/.lock,能夠得到以下內容(若是是第一次建立信號量,將獲取不到,這個時候就直接建立)
- 若是持有者已達上限,返回false,若是阻塞模式,就繼續嘗試acquired操做
- 若是持有者未達上限,更新semaphore/key/.lock的內容,將當前線程的sessionId加入到holders中。注意:更新的時候須要設置cas,它的值是「查詢信號量」步驟得到的「ModifyIndex」值,該值用於保證更新操做的基礎沒有被其餘競爭者更新。若是更新成功,就開始執行具體邏輯。若是沒有更新成功,說明有其餘競爭者搶佔了資源,返回false,阻塞模式下繼續嘗試acquired操做
- release操做:
- 從semaphore/key/.lock的holders中移除當前sessionId
- 刪除semaphore/key/session
- 刪除當前的session
流程圖
代碼實現
public class Semaphore {
private Logger logger = Logger.getLogger(getClass());
private static final String prefix = "semaphore/"; // 信號量參數前綴
private ConsulClient consulClient;
private int limit;
private String keyPath;
private String sessionId = null;
private boolean acquired = false;
/**
*
* @param consulClient consul客戶端實例
* @param limit 信號量上限值
* @param keyPath 信號量在consul中存儲的參數路徑
*/
public Semaphore(ConsulClient consulClient, int limit, String keyPath) {
this.consulClient = consulClient;
this.limit = limit;
this.keyPath = prefix + keyPath;
}
/**
* acquired信號量
*
* @param block 是否阻塞。若是爲true,那麼一直嘗試,直到獲取到該資源爲止。
* @return
* @throws IOException
*/
public Boolean acquired(boolean block) throws IOException {
if(acquired) {
logger.error(sessionId + " - Already acquired");
throw new RuntimeException(sessionId + " - Already acquired");
}
// create session
clearSession();
this.sessionId = createSessionId("semaphore");
logger.debug("Create session : " + sessionId);
// add contender entry
String contenderKey = keyPath + "/" + sessionId;
logger.debug("contenderKey : " + contenderKey);
PutParams putParams = new PutParams();
putParams.setAcquireSession(sessionId);
Boolean b = consulClient.setKVValue(contenderKey, "", putParams).getValue();
if(!b) {
logger.error("Failed to add contender entry : " + contenderKey + ", " + sessionId);
throw new RuntimeException("Failed to add contender entry : " + contenderKey + ", " + sessionId);
}
while(true) {
// try to take the semaphore
String lockKey = keyPath + "/.lock";
String lockKeyValue;
GetValue lockKeyContent = consulClient.getKVValue(lockKey).getValue();
if (lockKeyContent != null) {
// lock值轉換
lockKeyValue = lockKeyContent.getValue();
BASE64Decoder decoder = new BASE64Decoder();
byte[] v = decoder.decodeBuffer(lockKeyValue);
String lockKeyValueDecode = new String(v);
logger.debug("lockKey=" + lockKey + ", lockKeyValueDecode=" + lockKeyValueDecode);
Gson gson = new Gson();
ContenderValue contenderValue = gson.fromJson(lockKeyValueDecode, ContenderValue.class);
// 當前信號量已滿
if(contenderValue.getLimit() == contenderValue.getHolders().size()) {
logger.debug("Semaphore limited " + contenderValue.getLimit() + ", waiting...");
if(block) {
// 若是是阻塞模式,再嘗試
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
}
continue;
}
// 非阻塞模式,直接返回沒有獲取到信號量
return false;
}
// 信號量增長
contenderValue.getHolders().add(sessionId);
putParams = new PutParams();
putParams.setCas(lockKeyContent.getModifyIndex());
boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
if(c) {
acquired = true;
return true;
}
else
continue;
} else {
// 當前信號量尚未,因此建立一個,並立刻搶佔一個資源
ContenderValue contenderValue = new ContenderValue();
contenderValue.setLimit(limit);
contenderValue.getHolders().add(sessionId);
putParams = new PutParams();
putParams.setCas(0L);
boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
if (c) {
acquired = true;
return true;
}
continue;
}
}
}
/**
* 建立sessionId
* @param sessionName
* @return
*/
public String createSessionId(String sessionName) {
NewSession newSession = new NewSession();
newSession.setName(sessionName);
return consulClient.sessionCreate(newSession, null).getValue();
}
/**
* 釋放session、並從lock中移除當前的sessionId
* @throws IOException
*/
public void release() throws IOException {
if(this.acquired) {
// remove session from lock
while(true) {
String contenderKey = keyPath + "/" + sessionId;
String lockKey = keyPath + "/.lock";
String lockKeyValue;
GetValue lockKeyContent = consulClient.getKVValue(lockKey).getValue();
if (lockKeyContent != null) {
// lock值轉換
lockKeyValue = lockKeyContent.getValue();
BASE64Decoder decoder = new BASE64Decoder();
byte[] v = decoder.decodeBuffer(lockKeyValue);
String lockKeyValueDecode = new String(v);
Gson gson = new Gson();
ContenderValue contenderValue = gson.fromJson(lockKeyValueDecode, ContenderValue.class);
contenderValue.getHolders().remove(sessionId);
PutParams putParams = new PutParams();
putParams.setCas(lockKeyContent.getModifyIndex());
consulClient.deleteKVValue(contenderKey);
boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
if(c) {
break;
}
}
}
// remove session key
}
this.acquired = false;
clearSession();
}
public void clearSession() {
if(sessionId != null) {
consulClient.sessionDestroy(sessionId, null);
sessionId = null;
}
}
class ContenderValue implements Serializable {
private Integer limit;
private List<String> holders = new ArrayList<>();
public Integer getLimit() {
return limit;
}
public void setLimit(Integer limit) {
this.limit = limit;
}
public List<String> getHolders() {
return holders;
}
public void setHolders(List<String> holders) {
this.holders = holders;
}
@Override
public String toString() {
return new Gson().toJson(this);
}
}
}
單元測試
下面單元測試的邏輯:經過線程的方式來模擬不一樣的分佈式服務來獲取信號量執行業務邏輯。因爲信號量與簡單的分佈式互斥鎖有所不一樣,它不是隻限定一個線程能夠操做,而是能夠控制多個線程的併發,因此經過下面的單元測試,咱們設置信號量爲3,而後同時啓動15個線程來競爭的狀況,來觀察分佈式信號量實現的結果如何。
public class TestLock {
private Logger logger = Logger.getLogger(getClass());
@Test
public void testSemaphore() throws Exception {
new Thread(new SemaphoreRunner(1)).start();
new Thread(new SemaphoreRunner(2)).start();
new Thread(new SemaphoreRunner(3)).start();
new Thread(new SemaphoreRunner(4)).start();
new Thread(new SemaphoreRunner(5)).start();
new Thread(new SemaphoreRunner(6)).start();
new Thread(new SemaphoreRunner(7)).start();
new Thread(new SemaphoreRunner(8)).start();
new Thread(new SemaphoreRunner(9)).start();
new Thread(new SemaphoreRunner(10)).start();
Thread.sleep(1000000L);
}
}
public class SemaphoreRunner implements Runnable {
private Logger logger = Logger.getLogger(getClass());
private int flag;
public SemaphoreRunner(int flag) {
this.flag = flag;
}
@Override
public void run() {
Semaphore semaphore = new Semaphore(new ConsulClient(), 3, "mg-init");
try {
if (semaphore.acquired(true)) {
// 獲取到信號量,執行業務邏輯
logger.info("Thread " + flag + " start!");
Thread.sleep(new Random().nextInt(10000));
logger.info("Thread " + flag + " end!");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 信號量釋放、Session鎖釋放、Session刪除
semaphore.release();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
執行結果:
INFO [Thread-6] SemaphoreRunner - Thread 7 start!
INFO [Thread-2] SemaphoreRunner - Thread 3 start!
INFO [Thread-7] SemaphoreRunner - Thread 8 start!
INFO [Thread-2] SemaphoreRunner - Thread 3 end!
INFO [Thread-5] SemaphoreRunner - Thread 6 start!
INFO [Thread-6] SemaphoreRunner - Thread 7 end!
INFO [Thread-9] SemaphoreRunner - Thread 10 start!
INFO [Thread-5] SemaphoreRunner - Thread 6 end!
INFO [Thread-1] SemaphoreRunner - Thread 2 start!
INFO [Thread-7] SemaphoreRunner - Thread 8 end!
INFO [Thread-10] SemaphoreRunner - Thread 11 start!
INFO [Thread-10] SemaphoreRunner - Thread 11 end!
INFO [Thread-12] SemaphoreRunner - Thread 13 start!
INFO [Thread-1] SemaphoreRunner - Thread 2 end!
INFO [Thread-3] SemaphoreRunner - Thread 4 start!
INFO [Thread-9] SemaphoreRunner - Thread 10 end!
INFO [Thread-0] SemaphoreRunner - Thread 1 start!
INFO [Thread-3] SemaphoreRunner - Thread 4 end!
INFO [Thread-14] SemaphoreRunner - Thread 15 start!
INFO [Thread-12] SemaphoreRunner - Thread 13 end!
INFO [Thread-0] SemaphoreRunner - Thread 1 end!
INFO [Thread-13] SemaphoreRunner - Thread 14 start!
INFO [Thread-11] SemaphoreRunner - Thread 12 start!
INFO [Thread-13] SemaphoreRunner - Thread 14 end!
INFO [Thread-4] SemaphoreRunner - Thread 5 start!
INFO [Thread-4] SemaphoreRunner - Thread 5 end!
INFO [Thread-8] SemaphoreRunner - Thread 9 start!
INFO [Thread-11] SemaphoreRunner - Thread 12 end!
INFO [Thread-14] SemaphoreRunner - Thread 15 end!
INFO [Thread-8] SemaphoreRunner - Thread 9 end!
從測試結果,咱們能夠發現當信號量持有者數量達到信號量上限3的時候,其餘競爭者就開始進行等待了,只有當某個持有者釋放信號量以後,纔會有新的線程變成持有者,從而開始執行本身的業務邏輯。因此,分佈式信號量能夠幫助咱們有效的控制同時操做某個共享資源的併發數。
優化建議與參考文檔
同前文同樣,這裏只是作了簡單的實現。線上應用還必須加入TTL的session清理以及對.lock資源中的無效holder進行清理的機制。
參考文檔:
https://www.consul.io/docs/guides/semaphore.html
轉自:http://mp.weixin.qq.com/s?__biz=MzAxODcyNjEzNQ==&mid=2247483857&idx=1&sn=495c0faad9bc237132aca49e722022ec&chksm=9bd0ac49aca7255fec67f9364fab63638b30e7a69fc0771f5977a6cc9a38856879b64832bc67&scene=21#wechat_redirect